diff --git a/Cargo.lock b/Cargo.lock index a4dcfbbd90..c0545cbe7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -530,7 +530,7 @@ dependencies = [ "http 0.2.1", "indexmap", "slab", - "tokio", + "tokio 0.2.23", "tokio-util", "tracing", ] @@ -649,7 +649,7 @@ dependencies = [ "pin-project", "socket2", "time", - "tokio", + "tokio 0.2.23", "tower-service", "tracing", "want", @@ -663,7 +663,7 @@ dependencies = [ "http 0.2.1", "hyper", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-test", "tower", ] @@ -823,7 +823,7 @@ dependencies = [ "linkerd2-error", "linkerd2-opencensus", "regex 1.3.9", - "tokio", + "tokio 0.2.23", "tonic", "tower", "tracing", @@ -883,7 +883,7 @@ dependencies = [ "procinfo", "prost-types", "regex 1.3.9", - "tokio", + "tokio 0.2.23", "tokio-timer", "tonic", "tower", @@ -902,7 +902,7 @@ dependencies = [ "linkerd2-app-core", "linkerd2-app-inbound", "linkerd2-app-outbound", - "tokio", + "tokio 0.2.23", "tokio-test", "tower", "tower-test", @@ -918,7 +918,7 @@ dependencies = [ "http 0.2.1", "indexmap", "linkerd2-app-core", - "tokio", + "tokio 0.2.23", "tower", "tracing", ] @@ -942,7 +942,7 @@ dependencies = [ "regex 0.1.80", "rustls", "socket2", - "tokio", + "tokio 0.2.23", "tokio-rustls", "tonic", "tower", @@ -968,7 +968,7 @@ dependencies = [ "linkerd2-io", "linkerd2-retry", "pin-project", - "tokio", + "tokio 0.2.23", "tower", "tracing", "tracing-futures", @@ -979,7 +979,7 @@ name = "linkerd2-app-profiling" version = "0.1.0" dependencies = [ "linkerd2-app-integration", - "tokio", + "tokio 0.2.23", ] [[package]] @@ -993,7 +993,7 @@ dependencies = [ "hyper", "linkerd2-app-core", "regex 0.1.80", - "tokio", + "tokio 0.2.23", "tokio-test", "tower", "tracing", @@ -1006,9 +1006,10 @@ name = "linkerd2-buffer" version = "0.1.0" dependencies = [ "futures 0.3.5", + "linkerd2-channel", "linkerd2-error", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-test", "tower", "tower-test", @@ -1024,18 +1025,26 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "parking_lot", - "tokio", + "tokio 0.2.23", "tower", "tracing", ] +[[package]] +name = "linkerd2-channel" +version = "0.1.0" +dependencies = [ + "futures 0.3.5", + "tokio 0.3.5", +] + [[package]] name = "linkerd2-concurrency-limit" version = "0.1.0" dependencies = [ "futures 0.3.5", "pin-project", - "tokio", + "tokio 0.2.23", "tower", "tracing", ] @@ -1052,7 +1061,7 @@ dependencies = [ "linkerd2-dns-name", "linkerd2-error", "pin-project", - "tokio", + "tokio 0.2.23", "tracing", "trust-dns-resolver", ] @@ -1072,7 +1081,7 @@ dependencies = [ "futures 0.3.5", "linkerd2-error", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-test", ] @@ -1083,7 +1092,7 @@ dependencies = [ "bytes 0.5.4", "futures 0.3.5", "pin-project", - "tokio", + "tokio 0.2.23", "tracing", ] @@ -1127,7 +1136,7 @@ dependencies = [ "pin-project", "quickcheck", "rand 0.7.2", - "tokio", + "tokio 0.2.23", ] [[package]] @@ -1193,7 +1202,7 @@ dependencies = [ "futures 0.3.5", "linkerd2-errno", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-rustls", "tokio-test", ] @@ -1210,7 +1219,7 @@ dependencies = [ "indexmap", "parking_lot", "quickcheck", - "tokio", + "tokio 0.2.23", "tracing", ] @@ -1225,7 +1234,7 @@ dependencies = [ "linkerd2-metrics", "opencensus-proto", "pin-project", - "tokio", + "tokio 0.2.23", "tonic", "tower", "tracing", @@ -1240,7 +1249,7 @@ dependencies = [ "linkerd2-signal", "mimalloc", "num_cpus", - "tokio", + "tokio 0.2.23", "tracing", ] @@ -1295,11 +1304,12 @@ dependencies = [ "async-stream", "futures 0.3.5", "indexmap", + "linkerd2-channel", "linkerd2-error", "linkerd2-proxy-core", "linkerd2-stack", "pin-project", - "tokio", + "tokio 0.2.23", "tower", "tracing", "tracing-futures", @@ -1314,7 +1324,7 @@ dependencies = [ "linkerd2-dns", "linkerd2-error", "linkerd2-proxy-core", - "tokio", + "tokio 0.2.23", "tower", "tracing", "tracing-futures", @@ -1343,7 +1353,7 @@ dependencies = [ "linkerd2-timeout", "pin-project", "rand 0.7.2", - "tokio", + "tokio 0.2.23", "tower", "tracing", "tracing-futures", @@ -1361,7 +1371,7 @@ dependencies = [ "linkerd2-proxy-api", "linkerd2-proxy-transport", "pin-project", - "tokio", + "tokio 0.2.23", "tonic", "tracing", ] @@ -1398,7 +1408,7 @@ dependencies = [ "pin-project", "prost-types", "rand 0.7.2", - "tokio", + "tokio 0.2.23", "tonic", "tower", "tracing", @@ -1415,7 +1425,7 @@ dependencies = [ "linkerd2-stack", "pin-project", "rand 0.7.2", - "tokio", + "tokio 0.2.23", "tower", ] @@ -1438,7 +1448,7 @@ dependencies = [ "linkerd2-stack", "pin-project", "rustls", - "tokio", + "tokio 0.2.23", "tokio-rustls", "tokio-util", "tower", @@ -1492,7 +1502,7 @@ dependencies = [ "quickcheck", "rand 0.7.2", "regex 1.3.9", - "tokio", + "tokio 0.2.23", "tonic", "tower", "tracing", @@ -1503,7 +1513,7 @@ dependencies = [ name = "linkerd2-signal" version = "0.1.0" dependencies = [ - "tokio", + "tokio 0.2.23", "tracing", ] @@ -1515,7 +1525,7 @@ dependencies = [ "futures 0.3.5", "linkerd2-error", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-test", "tower", "tower-test", @@ -1553,7 +1563,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-connect", "tokio-test", "tower", @@ -1573,7 +1583,7 @@ dependencies = [ "linkerd2-error", "linkerd2-stack", "rand 0.7.2", - "tokio", + "tokio 0.2.23", "tower", "tracing", ] @@ -1941,6 +1951,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "237844750cfbb86f67afe27eee600dfbbcb6188d734139b534cbfbf4f96792ae" +[[package]] +name = "pin-project-lite" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" + [[package]] name = "pin-utils" version = "0.1.0" @@ -2585,7 +2601,7 @@ dependencies = [ "mio-uds", "num_cpus", "parking_lot", - "pin-project-lite", + "pin-project-lite 0.1.4", "signal-hook-registry", "slab", "tokio-macros", @@ -2593,6 +2609,17 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "tokio" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12a3eb39ee2c231be64487f1fcbe726c8f2514876a55480a5ab8559fc374252" +dependencies = [ + "autocfg 1.0.0", + "futures-core", + "pin-project-lite 0.2.0", +] + [[package]] name = "tokio-connect" version = "0.1.0" @@ -2642,7 +2669,7 @@ checksum = "e12831b255bcfa39dc0436b01e19fea231a37db570686c06ee72c423479f889a" dependencies = [ "futures-core", "rustls", - "tokio", + "tokio 0.2.23", "webpki", ] @@ -2654,7 +2681,7 @@ checksum = "ed0049c119b6d505c4447f5c64873636c7af6c75ab0d45fd9f618d82acb8016d" dependencies = [ "bytes 0.5.4", "futures-core", - "tokio", + "tokio 0.2.23", ] [[package]] @@ -2676,7 +2703,7 @@ source = "git+https://github.com/hawkw/tokio-trace?rev=a8240c5cbb4ff981def84920d dependencies = [ "num_cpus", "serde", - "tokio", + "tokio 0.2.23", "tracing-core", "tracing-subscriber", ] @@ -2692,8 +2719,8 @@ dependencies = [ "futures-io", "futures-sink", "log", - "pin-project-lite", - "tokio", + "pin-project-lite 0.1.4", + "tokio 0.2.23", ] [[package]] @@ -2743,7 +2770,7 @@ dependencies = [ "pin-project", "rand 0.7.2", "slab", - "tokio", + "tokio 0.2.23", "tower-layer 0.3.0 (git+https://github.com/tower-rs/tower?rev=ad348d8)", "tower-service", "tracing", @@ -2766,7 +2793,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce50370d644a0364bf4877ffd4f76404156a248d104e2cc234cd391ea5cdc965" dependencies = [ - "tokio", + "tokio 0.2.23", "tower-service", ] @@ -2793,7 +2820,7 @@ checksum = "9ba4bbc2c1e4a8543c30d4c13a4c8314ed72d6e07581910f665aa13fde0153c8" dependencies = [ "futures-util", "pin-project", - "tokio", + "tokio 0.2.23", "tokio-test", "tower-layer 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "tower-service", @@ -2902,7 +2929,7 @@ dependencies = [ "rand 0.7.2", "smallvec", "thiserror", - "tokio", + "tokio 0.2.23", "url", ] @@ -2921,7 +2948,7 @@ dependencies = [ "resolv-conf", "smallvec", "thiserror", - "tokio", + "tokio 0.2.23", "trust-dns-proto", ] diff --git a/Cargo.toml b/Cargo.toml index e625d717e1..17c7d2d405 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,9 @@ members = [ "linkerd/app/profiling", "linkerd/app/test", "linkerd/app", - "linkerd/cache", "linkerd/buffer", + "linkerd/cache", + "linkerd/channel", "linkerd/concurrency-limit", "linkerd/conditional", "linkerd/dns/name", diff --git a/linkerd/buffer/Cargo.toml b/linkerd/buffer/Cargo.toml index 804ade0348..453c3ceff9 100644 --- a/linkerd/buffer/Cargo.toml +++ b/linkerd/buffer/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.3" +linkerd2-channel = { path = "../channel" } linkerd2-error = { path = "../error" } tokio = { version = "0.2", features = ["sync", "stream", "time", "macros"] } tower = { version = "0.3", default_features = false, features = ["util"] } diff --git a/linkerd/buffer/src/dispatch.rs b/linkerd/buffer/src/dispatch.rs index 26ec113ff0..76f10f1faf 100644 --- a/linkerd/buffer/src/dispatch.rs +++ b/linkerd/buffer/src/dispatch.rs @@ -1,9 +1,9 @@ use crate::error::{IdleError, ServiceError}; use crate::InFlight; use futures::{prelude::*, select_biased}; +use linkerd2_channel as mpsc; use linkerd2_error::Error; use std::sync::Arc; -use tokio::sync::mpsc; use tower::util::ServiceExt; use tracing::trace; @@ -29,7 +29,7 @@ pub(crate) async fn run( req = requests.recv().fuse() => { match req { None => return, - Some(InFlight { request, tx }) => { + Some(InFlight { request, tx, .. }) => { match service.ready_and().await { Ok(svc) => { trace!("Dispatching request"); @@ -44,7 +44,7 @@ pub(crate) async fn run( while let Some(InFlight { tx, .. }) = requests.recv().await { let _ = tx.send(Err(error.clone().into())); } - return; + break; } }; } @@ -54,7 +54,7 @@ pub(crate) async fn run( e = idle().fuse() => { let error = ServiceError(Arc::new(e.into())); trace!(%error, "Idling out inner service"); - return; + break; } } } @@ -64,7 +64,7 @@ pub(crate) async fn run( mod test { use super::*; use std::time::Duration; - use tokio::sync::{mpsc, oneshot}; + use tokio::sync::oneshot; use tokio::time::delay_for; use tokio_test::{assert_pending, assert_ready, task}; use tower_test::mock; @@ -101,12 +101,13 @@ mod test { delay_for(max_idle).await; // Send a request after the deadline has fired but before the - // dispatch future is polled. Ensure that the request is admitted, resetting idleness. - tx.try_send({ + // dispatch future is polled. Ensure that the request is admitted, + // resetting idleness. + tx.send({ let (tx, _rx) = oneshot::channel(); super::InFlight { request: (), tx } }) - .ok() + .await .expect("request not sent"); assert_pending!(dispatch.poll()); diff --git a/linkerd/buffer/src/lib.rs b/linkerd/buffer/src/lib.rs index 2fe62c797a..caae4aa6e8 100644 --- a/linkerd/buffer/src/lib.rs +++ b/linkerd/buffer/src/lib.rs @@ -1,8 +1,9 @@ #![recursion_limit = "256"] +use linkerd2_channel as mpsc; use linkerd2_error::Error; use std::{future::Future, pin::Pin, time::Duration}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; mod dispatch; pub mod error; diff --git a/linkerd/buffer/src/service.rs b/linkerd/buffer/src/service.rs index a6b9df64c7..f72dd09392 100644 --- a/linkerd/buffer/src/service.rs +++ b/linkerd/buffer/src/service.rs @@ -1,9 +1,10 @@ use crate::error::Closed; use crate::InFlight; +use linkerd2_channel as mpsc; use linkerd2_error::Error; use std::task::{Context, Poll}; use std::{future::Future, pin::Pin}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; pub struct Buffer { /// The queue on which in-flight requests are sent to the inner service. @@ -27,14 +28,13 @@ where type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.tx.poll_ready(cx).map_err(|_| Closed(()).into()) + self.tx.poll_ready(cx).map_err(Into::into) } fn call(&mut self, request: Req) -> Self::Future { let (tx, rx) = oneshot::channel(); self.tx .try_send(InFlight { request, tx }) - .ok() .expect("poll_ready must be called"); Box::pin(async move { rx.await.map_err(|_| Closed(()))??.await }) } diff --git a/linkerd/channel/Cargo.toml b/linkerd/channel/Cargo.toml new file mode 100644 index 0000000000..af02d2fd27 --- /dev/null +++ b/linkerd/channel/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "linkerd2-channel" +version = "0.1.0" +authors = ["Linkerd Developers "] +edition = "2018" +publish = false +description = """ +A bounded MPSC channel where senders expose a `poll_ready` method. +""" + +[dependencies] +tokio = { version = "0.3", features = ["sync", "stream"]} +futures = "0.3" \ No newline at end of file diff --git a/linkerd/channel/src/lib.rs b/linkerd/channel/src/lib.rs new file mode 100644 index 0000000000..00d52c31fd --- /dev/null +++ b/linkerd/channel/src/lib.rs @@ -0,0 +1,201 @@ +use futures::{future, ready, Stream}; +use std::sync::{Arc, Weak}; +use std::task::{Context, Poll}; +use std::{fmt, future::Future, mem, pin::Pin}; +use tokio::sync::{mpsc, OwnedSemaphorePermit as Permit, Semaphore}; + +/// Returns a new pollable, bounded MPSC channel. +/// +/// Unlike `tokio::sync`'s `MPSC` channel, this channel exposes a `poll_ready` +/// function, at the cost of an allocation when driving it to readiness. +pub fn channel(buffer: usize) -> (Sender, Receiver) { + assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); + let semaphore = Arc::new(Semaphore::new(buffer)); + let (tx, rx) = mpsc::unbounded_channel(); + let rx = Receiver { + rx, + semaphore: Arc::downgrade(&semaphore), + buffer, + }; + let tx = Sender { + tx, + semaphore, + state: State::Empty, + }; + (tx, rx) +} + +#[derive(Debug)] +pub struct Sender { + tx: mpsc::UnboundedSender<(T, Permit)>, + semaphore: Arc, + state: State, +} + +#[derive(Debug)] +pub struct Receiver { + rx: mpsc::UnboundedReceiver<(T, Permit)>, + semaphore: Weak, + buffer: usize, +} + +pub enum SendError { + AtCapacity(T), + Closed(T, Closed), +} + +enum State { + Waiting(Pin + Send + Sync>>), + Acquired(Permit), + Empty, +} + +#[derive(Copy, Clone, Debug)] +pub struct Closed(pub(crate) ()); + +// === impl Sender === + +impl Sender { + pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + self.state = match self.state { + State::Empty => State::Waiting(Box::pin(self.semaphore.clone().acquire_owned())), + State::Waiting(ref mut f) => State::Acquired(ready!(Pin::new(f).poll(cx))), + State::Acquired(_) if self.tx.is_closed() => return Poll::Ready(Err(Closed(()))), + State::Acquired(_) => return Poll::Ready(Ok(())), + } + } + } + + pub async fn ready(&mut self) -> Result<(), Closed> { + future::poll_fn(|cx| self.poll_ready(cx)).await + } + + pub fn try_send(&mut self, value: T) -> Result<(), SendError> { + if self.tx.is_closed() { + return Err(SendError::Closed(value, Closed(()))); + } + self.state = match mem::replace(&mut self.state, State::Empty) { + State::Acquired(_permit) => { + self.tx.send((value, _permit)).ok().expect("was not closed"); + return Ok(()); + } + state => state, + }; + Err(SendError::AtCapacity(value)) + } + + pub async fn send(&mut self, value: T) -> Result<(), Closed> { + self.ready().await?; + match mem::replace(&mut self.state, State::Empty) { + State::Acquired(_permit) => { + self.tx.send((value, _permit)).ok().expect("was not closed"); + Ok(()) + } + state => panic!("unexpected state after poll_ready: {:?}", state), + } + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + semaphore: self.semaphore.clone(), + state: State::Empty, + } + } +} + +// === impl Receiver === + +impl Receiver { + pub async fn recv(&mut self) -> Option { + self.rx.recv().await.map(|(t, _)| t) + } + + pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { + let res = ready!(Pin::new(&mut self.rx).poll_next(cx)); + Poll::Ready(res.map(|(t, _)| t)) + } +} + +impl Stream for Receiver { + type Item = T; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let res = ready!(Pin::new(&mut self.as_mut().rx).poll_next(cx)); + Poll::Ready(res.map(|(t, _)| t)) + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + if let Some(semaphore) = self.semaphore.upgrade() { + // Close the buffer by releasing any senders waiting on channel capacity. + // If more than `usize::MAX >> 3` permits are added to the semaphore, it + // will panic. + const MAX: usize = std::usize::MAX >> 4; + semaphore.add_permits(MAX - self.buffer - semaphore.available_permits()); + } + } +} +// === impl State === + +impl fmt::Debug for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt( + match self { + State::Acquired(_) => "State::Acquired(..)", + State::Waiting(_) => "State::Waiting(..)", + State::Empty => "State::Empty", + }, + f, + ) + } +} + +// === impl SendError === + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SendError::AtCapacity(_) => f + .debug_tuple("SendError::AtCapacity") + .field(&format_args!("..")) + .finish(), + SendError::Closed(_, c) => f + .debug_tuple("SendError::Closed") + .field(c) + .field(&format_args!("..")) + .finish(), + } + } +} + +impl std::fmt::Display for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SendError::AtCapacity(_) => fmt::Display::fmt("channel at capacity", f), + SendError::Closed(_, _) => fmt::Display::fmt("channel closed", f), + } + } +} + +impl std::error::Error for SendError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + SendError::Closed(_, c) => Some(c), + _ => None, + } + } +} + +// === impl Closed === + +impl std::fmt::Display for Closed { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "closed") + } +} + +impl std::error::Error for Closed {} diff --git a/linkerd/proxy/discover/Cargo.toml b/linkerd/proxy/discover/Cargo.toml index 3c0941bee5..8c673fe481 100644 --- a/linkerd/proxy/discover/Cargo.toml +++ b/linkerd/proxy/discover/Cargo.toml @@ -11,6 +11,7 @@ Utilities to implement a Discover with the core Resolve type [dependencies] futures = "0.3" +linkerd2-channel = { path = "../../channel" } linkerd2-error = { path = "../../error" } linkerd2-proxy-core = { path = "../core" } linkerd2-stack = { path = "../../stack" } diff --git a/linkerd/proxy/discover/src/buffer.rs b/linkerd/proxy/discover/src/buffer.rs index 0dcc860912..dc6d49c197 100644 --- a/linkerd/proxy/discover/src/buffer.rs +++ b/linkerd/proxy/discover/src/buffer.rs @@ -1,11 +1,12 @@ use futures::{ready, Stream, TryFuture}; +use linkerd2_channel as mpsc; use linkerd2_error::{Error, Never}; use pin_project::pin_project; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use tokio::time::{self, Delay}; use tower::discover; use tracing::warn;