Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update buffers to use Tokio 0.3 MPSC channels #759

Merged
merged 5 commits into from Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
162 changes: 107 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -10,8 +10,9 @@ members = [
"linkerd/app/profiling",
"linkerd/app/test",
"linkerd/app",
"linkerd/cache",
"linkerd/buffer",
"linkerd/cache",
"linkerd/channel",
"linkerd/concurrency-limit",
"linkerd/conditional",
"linkerd/dns/name",
Expand Down
1 change: 1 addition & 0 deletions linkerd/buffer/Cargo.toml
Expand Up @@ -7,6 +7,7 @@ publish = false

[dependencies]
futures = "0.3"
linkerd2-channel = { path = "../channel" }
linkerd2-error = { path = "../error" }
tokio = { version = "0.2", features = ["sync", "stream", "time", "macros"] }
tower = { version = "0.3", default_features = false, features = ["util"] }
Expand Down
17 changes: 9 additions & 8 deletions linkerd/buffer/src/dispatch.rs
@@ -1,9 +1,9 @@
use crate::error::{IdleError, ServiceError};
use crate::InFlight;
use futures::{prelude::*, select_biased};
use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::sync::Arc;
use tokio::sync::mpsc;
use tower::util::ServiceExt;
use tracing::trace;

Expand All @@ -29,7 +29,7 @@ pub(crate) async fn run<S, Req, I>(
req = requests.recv().fuse() => {
match req {
None => return,
Some(InFlight { request, tx }) => {
Some(InFlight { request, tx, .. }) => {
hawkw marked this conversation as resolved.
Show resolved Hide resolved
match service.ready_and().await {
Ok(svc) => {
trace!("Dispatching request");
Expand All @@ -44,7 +44,7 @@ pub(crate) async fn run<S, Req, I>(
while let Some(InFlight { tx, .. }) = requests.recv().await {
let _ = tx.send(Err(error.clone().into()));
}
return;
break;
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}
};
}
Expand All @@ -54,7 +54,7 @@ pub(crate) async fn run<S, Req, I>(
e = idle().fuse() => {
let error = ServiceError(Arc::new(e.into()));
trace!(%error, "Idling out inner service");
return;
break;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
break;
return;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

way ahead of you, buddy e2046e3 :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw i think that got the other break, but there were two.. so i think this is still relevant?

}
}
}
Expand All @@ -64,7 +64,7 @@ pub(crate) async fn run<S, Req, I>(
mod test {
use super::*;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tokio::time::delay_for;
use tokio_test::{assert_pending, assert_ready, task};
use tower_test::mock;
Expand Down Expand Up @@ -101,12 +101,13 @@ mod test {
delay_for(max_idle).await;

// Send a request after the deadline has fired but before the
// dispatch future is polled. Ensure that the request is admitted, resetting idleness.
tx.try_send({
// dispatch future is polled. Ensure that the request is admitted,
// resetting idleness.
tx.send({
let (tx, _rx) = oneshot::channel();
super::InFlight { request: (), tx }
})
.ok()
.await
.expect("request not sent");

assert_pending!(dispatch.poll());
Expand Down
15 changes: 13 additions & 2 deletions linkerd/buffer/src/lib.rs
@@ -1,8 +1,9 @@
#![recursion_limit = "256"]

use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::{future::Future, pin::Pin, time::Duration};
use tokio::sync::{mpsc, oneshot};
use std::{fmt, future::Future, pin::Pin, time::Duration};
use tokio::sync::oneshot;

mod dispatch;
pub mod error;
Expand Down Expand Up @@ -43,3 +44,13 @@ where
let dispatch = dispatch::run(inner, rx, idle);
(Buffer::new(tx), dispatch)
}

// Required so that `TrySendError`/`SendError` can be `expect`ed.
impl<Req, Rsp> fmt::Debug for InFlight<Req, Rsp> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InFlight")
.field("request_type", &std::any::type_name::<Req>())
.field("response_type", &std::any::type_name::<Rsp>())
.finish()
}
}
6 changes: 3 additions & 3 deletions linkerd/buffer/src/service.rs
@@ -1,9 +1,10 @@
use crate::error::Closed;
use crate::InFlight;
use linkerd2_channel as mpsc;
use linkerd2_error::Error;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;

pub struct Buffer<Req, Rsp> {
/// The queue on which in-flight requests are sent to the inner service.
Expand All @@ -27,14 +28,13 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Rsp, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready(cx).map_err(|_| Closed(()).into())
self.tx.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, request: Req) -> Self::Future {
let (tx, rx) = oneshot::channel();
self.tx
.try_send(InFlight { request, tx })
.ok()
.expect("poll_ready must be called");
Box::pin(async move { rx.await.map_err(|_| Closed(()))??.await })
}
Expand Down
17 changes: 17 additions & 0 deletions linkerd/channel/Cargo.toml
@@ -0,0 +1,17 @@
[package]
name = "linkerd2-channel"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
edition = "2018"
publish = false
description = """
A bounded MPSC channel where senders expose a `poll_ready` method.
"""

[dependencies]
tokio = { version = "0.3", features = ["sync", "stream"] }
futures = "0.3"

[dev-dependencies]
tokio = { version = "0.3", features = ["sync", "stream", "macros"] }
tokio-test = "0.3"
191 changes: 191 additions & 0 deletions linkerd/channel/src/lib.rs
@@ -0,0 +1,191 @@
use futures::{future, ready, Stream};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll};
use std::{fmt, future::Future, mem, pin::Pin};
use tokio::sync::{mpsc, OwnedSemaphorePermit as Permit, Semaphore};

use self::error::{SendError, TrySendError};
pub use tokio::sync::mpsc::error;

/// Returns a new pollable, bounded MPSC channel.
///
/// Unlike `tokio::sync`'s `MPSC` channel, this channel exposes a `poll_ready`
/// function, at the cost of an allocation when driving it to readiness.
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = Arc::new(Semaphore::new(buffer));
let (tx, rx) = mpsc::unbounded_channel();
let rx = Receiver {
rx,
semaphore: Arc::downgrade(&semaphore),
buffer,
};
let tx = Sender {
tx,
semaphore,
state: State::Empty,
};
(tx, rx)
}

/// A bounded, pollable MPSC sender.
///
/// This is similar to Tokio's bounded MPSC channel's `Sender` type, except that
/// it exposes a `poll_ready` function, at the cost of an allocation when
/// driving it to readiness.
pub struct Sender<T> {
tx: mpsc::UnboundedSender<(T, Permit)>,
semaphore: Arc<Semaphore>,
state: State,
}

/// A bounded MPSC receiver.
///
/// This is similar to Tokio's bounded MPSC channel's `Receiver` type.
pub struct Receiver<T> {
rx: mpsc::UnboundedReceiver<(T, Permit)>,
semaphore: Weak<Semaphore>,
buffer: usize,
}

enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync>>),
Acquired(Permit),
Empty,
}

impl<T> Sender<T> {
pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError<()>>> {
loop {
self.state = match self.state {
State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())),
State::Waiting(ref mut f) => State::Acquired(ready!(Pin::new(f).poll(cx))),
State::Acquired(_) if self.tx.is_closed() => {
return Poll::Ready(Err(SendError(())))
}
State::Acquired(_) => return Poll::Ready(Ok(())),
}
}
}

pub async fn ready(&mut self) -> Result<(), SendError<()>> {
future::poll_fn(|cx| self.poll_ready(cx)).await
}

pub fn try_send(&mut self, value: T) -> Result<(), TrySendError<T>> {
if self.tx.is_closed() {
return Err(TrySendError::Closed(value));
}
self.state = match mem::replace(&mut self.state, State::Empty) {
// Have we previously acquired a permit?
State::Acquired(permit) => {
self.send2(value, permit);
return Ok(());
}
// Okay, can we acquire a permit now?
State::Empty => {
if let Ok(permit) = self.semaphore.clone().try_acquire_owned() {
self.send2(value, permit);
return Ok(());
}
State::Empty
}
state => state,
};
Err(TrySendError::Full(value))
}

pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
if let Err(_) = self.ready().await {
return Err(SendError(value));
}
match mem::replace(&mut self.state, State::Empty) {
State::Acquired(permit) => {
self.send2(value, permit);
Ok(())
}
state => panic!("unexpected state after poll_ready: {:?}", state),
}
}

fn send2(&mut self, value: T, permit: Permit) {
self.tx.send((value, permit)).ok().expect("was not closed");
}
}

impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
semaphore: self.semaphore.clone(),
state: State::Empty,
}
}
}

impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Sender")
.field("message_type", &std::any::type_name::<T>())
.field("state", &self.state)
.field("semaphore", &self.semaphore)
.finish()
}
}

// === impl Receiver ===

impl<T> Receiver<T> {
pub async fn recv(&mut self) -> Option<T> {
self.rx.recv().await.map(|(t, _)| t)
}

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let res = ready!(Pin::new(&mut self.rx).poll_next(cx));
Poll::Ready(res.map(|(t, _)| t))
}
}

impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let res = ready!(Pin::new(&mut self.as_mut().rx).poll_next(cx));
Poll::Ready(res.map(|(t, _)| t))
}
}

impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(semaphore) = self.semaphore.upgrade() {
// Close the buffer by releasing any senders waiting on channel capacity.
// If more than `usize::MAX >> 3` permits are added to the semaphore, it
// will panic.
const MAX: usize = std::usize::MAX >> 4;
semaphore.add_permits(MAX - self.buffer - semaphore.available_permits());
}
}
}

impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Receiver")
.field("message_type", &std::any::type_name::<T>())
.field("semaphore", &self.semaphore)
.finish()
}
}

// === impl State ===

impl fmt::Debug for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(
match self {
State::Acquired(_) => "State::Acquired(..)",
State::Waiting(_) => "State::Waiting(..)",
State::Empty => "State::Empty",
},
f,
)
}
}