Skip to content

Commit

Permalink
Switch from futures::sync to tokio-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhoo committed Jan 23, 2019
1 parent 17d8e8d commit acbca13
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 78 deletions.
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ tower-discover = { git = "https://github.com/jonhoo/tower.git", branch = "dynami
tower-balance = { git = "https://github.com/jonhoo/tower.git", branch = "dynamic-balance" }
tower-buffer = { git = "https://github.com/jonhoo/tower.git", branch = "dynamic-balance" }
tower-util = { git = "https://github.com/jonhoo/tower.git", branch = "dynamic-balance" }
tokio-sync = { git = "https://github.com/jonhoo/tokio.git", branch = "debug-sender-no-t-debug" }
1 change: 1 addition & 0 deletions noria-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tokio = "0.1"
async-bincode = "0.4.5"
tokio-threadpool = "0.1"
tokio-io-pool = "0.1.1"
tokio-sync = "0.1"
streamunordered = "0.4.0"
bufstream = { version = "0.1.3", features = [ "tokio" ] }
stream-cancel = "0.4"
Expand Down
1 change: 1 addition & 0 deletions noria-server/dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ serde_json = "1.0.2"
slog = "2.4.0"
stream-cancel = "0.4"
tokio = "0.1"
tokio-sync = "0.1"
vec_map = { version = "0.8.0", features = ["eders"] }
hyper = "0.12.0"
tempfile = "3.0.2"
Expand Down
60 changes: 39 additions & 21 deletions noria-server/dataflow/src/backlog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ pub(crate) fn new_partial<F>(
trigger: F,
) -> (SingleReadHandle, WriteHandle)
where
F: Fn(&[DataType]) -> bool + 'static + Send + Sync,
F: Fn() -> Box<FnMut(&[DataType]) -> bool + 'static + Send + Sync> + 'static + Send + Sync,
{
new_inner(cols, key, Some(Arc::new(trigger)))
}

fn new_inner(
cols: usize,
key: &[usize],
trigger: Option<Arc<Fn(&[DataType]) -> bool + Send + Sync>>,
mk_trigger: Option<Arc<Fn() -> Box<FnMut(&[DataType]) -> bool + Send + Sync> + Send + Sync>>,
) -> (SingleReadHandle, WriteHandle) {
let contiguous = {
let mut contiguous = true;
Expand Down Expand Up @@ -65,16 +65,18 @@ fn new_inner(
};

let w = WriteHandle {
partial: trigger.is_some(),
partial: mk_trigger.is_some(),
handle: w,
key: Vec::from(key),
cols: cols,
contiguous,
mem_size: 0,
};
let trigger = mk_trigger.as_ref().map(|mk| mk());
let r = SingleReadHandle {
handle: r,
trigger: trigger,
mk_trigger,
trigger,
key: Vec::from(key),
};

Expand Down Expand Up @@ -293,23 +295,34 @@ impl SizeOf for WriteHandle {
}

/// Handle to get the state of a single shard of a reader.
#[derive(Clone)]
pub struct SingleReadHandle {
handle: multir::Handle,
trigger: Option<Arc<Fn(&[DataType]) -> bool + Send + Sync>>,
mk_trigger: Option<Arc<Fn() -> Box<FnMut(&[DataType]) -> bool + Send + Sync> + Send + Sync>>,
trigger: Option<Box<FnMut(&[DataType]) -> bool + Send + Sync>>,
key: Vec<usize>,
}

impl Clone for SingleReadHandle {
fn clone(&self) -> Self {
SingleReadHandle {
handle: self.handle.clone(),
mk_trigger: self.mk_trigger.clone(),
trigger: self.mk_trigger.as_ref().map(|f| f()),
key: self.key.clone(),
}
}
}

impl SingleReadHandle {
/// Trigger a replay of a missing key from a partially materialized view.
pub fn trigger(&self, key: &[DataType]) -> bool {
pub fn trigger(&mut self, key: &[DataType]) -> bool {
assert!(
self.trigger.is_some(),
"tried to trigger a replay for a fully materialized view"
);

// trigger a replay to populate
(*self.trigger.as_ref().unwrap())(key)
(*self.trigger.as_mut().unwrap())(key)
}

/// Find all entries that matched the given conditions.
Expand All @@ -320,7 +333,11 @@ impl SingleReadHandle {
/// swapped in by the writer.
///
/// Holes in partially materialized state are returned as `Ok((None, _))`.
pub fn try_find_and<F, T>(&self, key: &[DataType], mut then: F) -> Result<(Option<T>, i64), ()>
pub fn try_find_and<F, T>(
&mut self,
key: &[DataType],
mut then: F,
) -> Result<(Option<T>, i64), ()>
where
F: FnMut(&[Vec<DataType>]) -> T,
{
Expand Down Expand Up @@ -365,19 +382,20 @@ impl ReadHandle {
/// swapped in by the writer.
///
/// A hole in partially materialized state is returned as `Ok((None, _))`.
pub fn try_find_and<F, T>(&self, key: &[DataType], then: F) -> Result<(Option<T>, i64), ()>
pub fn try_find_and<F, T>(&mut self, key: &[DataType], then: F) -> Result<(Option<T>, i64), ()>
where
F: FnMut(&[Vec<DataType>]) -> T,
{
match *self {
ReadHandle::Sharded(ref shards) => {
ReadHandle::Sharded(ref mut shards) => {
assert_eq!(key.len(), 1);
shards[::shard_by(&key[0], shards.len())]
.as_ref()
let nshards = shards.len();
(&mut shards[::shard_by(&key[0], nshards)])
.as_mut()
.unwrap()
.try_find_and(key, then)
}
ReadHandle::Singleton(ref srh) => srh.as_ref().unwrap().try_find_and(key, then),
ReadHandle::Singleton(ref mut srh) => srh.as_mut().unwrap().try_find_and(key, then),
}
}

Expand Down Expand Up @@ -420,7 +438,7 @@ mod tests {
fn store_works() {
let a = vec![1.into(), "a".into()];

let (r, mut w) = new(2, &[0]);
let (mut r, mut w) = new(2, &[0]);

// initially, store is uninitialized
assert_eq!(r.try_find_and(&a[0..1], |rs| rs.len()), Err(()));
Expand Down Expand Up @@ -453,7 +471,7 @@ mod tests {
use std::thread;

let n = 10000;
let (r, mut w) = new(1, &[0]);
let (mut r, mut w) = new(1, &[0]);
thread::spawn(move || {
for i in 0..n {
w.add(vec![Record::Positive(vec![i.into()])]);
Expand All @@ -479,7 +497,7 @@ mod tests {
let a = vec![1.into(), "a".into()];
let b = vec![1.into(), "b".into()];

let (r, mut w) = new(2, &[0]);
let (mut r, mut w) = new(2, &[0]);
w.add(vec![Record::Positive(a.clone())]);
w.swap();
w.add(vec![Record::Positive(b.clone())]);
Expand All @@ -500,7 +518,7 @@ mod tests {
let b = vec![1.into(), "b".into()];
let c = vec![1.into(), "c".into()];

let (r, mut w) = new(2, &[0]);
let (mut r, mut w) = new(2, &[0]);
w.add(vec![Record::Positive(a.clone())]);
w.add(vec![Record::Positive(b.clone())]);
w.swap();
Expand Down Expand Up @@ -528,7 +546,7 @@ mod tests {
let a = vec![1.into(), "a".into()];
let b = vec![1.into(), "b".into()];

let (r, mut w) = new(2, &[0]);
let (mut r, mut w) = new(2, &[0]);
w.add(vec![Record::Positive(a.clone())]);
w.add(vec![Record::Positive(b.clone())]);
w.add(vec![Record::Negative(a.clone())]);
Expand All @@ -549,7 +567,7 @@ mod tests {
let a = vec![1.into(), "a".into()];
let b = vec![1.into(), "b".into()];

let (r, mut w) = new(2, &[0]);
let (mut r, mut w) = new(2, &[0]);
w.add(vec![Record::Positive(a.clone())]);
w.add(vec![Record::Positive(b.clone())]);
w.swap();
Expand All @@ -572,7 +590,7 @@ mod tests {
let b = vec![1.into(), "b".into()];
let c = vec![1.into(), "c".into()];

let (r, mut w) = new(2, &[0]);
let (mut r, mut w) = new(2, &[0]);
w.add(vec![
Record::Positive(a.clone()),
Record::Positive(b.clone()),
Expand Down
31 changes: 18 additions & 13 deletions noria-server/dataflow/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time;

use futures;
use group_commit::GroupCommitQueueSet;
use noria::channel::{self, TcpSender};
pub use noria::internal::DomainIndex as Index;
Expand Down Expand Up @@ -826,7 +825,7 @@ impl Domain {
let txs = (0..shards)
.map(|shard| {
let key = key.clone();
let (tx, rx) = futures::sync::mpsc::unbounded();
let (tx, rx) = tokio_sync::mpsc::unbounded_channel();
let sender = self
.channel_coordinator
.builder_for(&(trigger_domain, shard))
Expand All @@ -836,7 +835,10 @@ impl Domain {

tokio::spawn(
self.shutdown_valve
.wrap(rx)
.wrap(rx.map_err(|_| {
// NOTE: what kind of error would this even be?
()
}))
.map(move |miss| box Packet::RequestReaderReplay {
key: miss,
cols: key.clone(),
Expand All @@ -857,16 +859,19 @@ impl Domain {
})
.collect::<Vec<_>>();
let (r_part, w_part) =
backlog::new_partial(cols, &k[..], move |miss| {
let n = txs.len();
let tx = if n == 1 {
&txs[0]
} else {
// TODO: compound reader
assert_eq!(miss.len(), 1);
&txs[::shard_by(&miss[0], n)]
};
tx.unbounded_send(Vec::from(miss)).is_ok()
backlog::new_partial(cols, &k[..], move || {
let mut txs = txs.clone();
Box::new(move |miss: &[DataType]| {
let n = txs.len();
let tx = if n == 1 {
&mut txs[0]
} else {
// TODO: compound reader
assert_eq!(miss.len(), 1);
&mut txs[::shard_by(&miss[0], n)]
};
tx.try_send(Vec::from(miss)).is_ok()
})
});

let mut n = self.nodes[node].borrow_mut();
Expand Down
1 change: 0 additions & 1 deletion noria-server/dataflow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ extern crate bincode;
extern crate common;
extern crate evmap;
extern crate fnv;
extern crate futures;
extern crate itertools;
extern crate nom_sql;
extern crate noria;
Expand Down
16 changes: 8 additions & 8 deletions noria-server/src/controller/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio_io_pool;
pub struct WorkerHandle<A: Authority + 'static> {
c: Option<ControllerHandle<A>>,
#[allow(dead_code)]
event_tx: Option<futures::sync::mpsc::UnboundedSender<Event>>,
event_tx: Option<tokio_sync::mpsc::UnboundedSender<Event>>,
kill: Option<Trigger>,
iopool: Option<tokio_io_pool::Runtime>,
}
Expand All @@ -37,7 +37,7 @@ impl<A: Authority> DerefMut for WorkerHandle<A> {
impl<A: Authority + 'static> WorkerHandle<A> {
pub(super) fn new(
authority: Arc<A>,
event_tx: futures::sync::mpsc::UnboundedSender<Event>,
event_tx: tokio_sync::mpsc::UnboundedSender<Event>,
kill: Trigger,
io: tokio_io_pool::Runtime,
) -> impl Future<Item = Self, Error = failure::Error> {
Expand All @@ -52,9 +52,9 @@ impl<A: Authority + 'static> WorkerHandle<A> {
#[cfg(test)]
pub(crate) fn ready<E>(self) -> impl Future<Item = Self, Error = E> {
let snd = self.event_tx.clone().unwrap();
future::loop_fn((self, snd), |(this, snd)| {
let (tx, rx) = futures::sync::oneshot::channel();
snd.unbounded_send(Event::IsReady(tx)).unwrap();
future::loop_fn((self, snd), |(this, mut snd)| {
let (tx, rx) = tokio_sync::oneshot::channel();
snd.try_send(Event::IsReady(tx)).unwrap();
rx.map_err(|_| unimplemented!("worker loop went away"))
.and_then(|v| {
if v {
Expand All @@ -79,8 +79,8 @@ impl<A: Authority + 'static> WorkerHandle<A> {
F: FnOnce(&mut Migration) -> T + Send + 'static,
T: Send + 'static,
{
let (ret_tx, ret_rx) = futures::sync::oneshot::channel();
let (fin_tx, fin_rx) = futures::sync::oneshot::channel();
let (ret_tx, ret_rx) = tokio_sync::oneshot::channel();
let (fin_tx, fin_rx) = tokio_sync::oneshot::channel();
let b = Box::new(move |m: &mut Migration| -> () {
if ret_tx.send(f(m)).is_err() {
unreachable!("could not return migration result");
Expand All @@ -90,7 +90,7 @@ impl<A: Authority + 'static> WorkerHandle<A> {
self.event_tx
.clone()
.unwrap()
.unbounded_send(Event::ManualMigration { f: b, done: fin_tx })
.try_send(Event::ManualMigration { f: b, done: fin_tx })
.unwrap();

match fin_rx.wait() {
Expand Down
4 changes: 2 additions & 2 deletions noria-server/src/controller/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct ControllerInner {
pub(crate) replies: DomainReplies,
}

pub(crate) struct DomainReplies(futures::sync::mpsc::UnboundedReceiver<ControlReplyPacket>);
pub(crate) struct DomainReplies(tokio_sync::mpsc::UnboundedReceiver<ControlReplyPacket>);

impl DomainReplies {
fn read_n_domain_replies(&mut self, n: usize) -> Vec<ControlReplyPacket> {
Expand Down Expand Up @@ -403,7 +403,7 @@ impl ControllerInner {
pub(super) fn new(
log: slog::Logger,
state: ControllerState,
drx: futures::sync::mpsc::UnboundedReceiver<ControlReplyPacket>,
drx: tokio_sync::mpsc::UnboundedReceiver<ControlReplyPacket>,
) -> Self {
let mut g = petgraph::Graph::new();
let source = g.add_node(node::Node::new(
Expand Down

0 comments on commit acbca13

Please sign in to comment.