Skip to content

Commit

Permalink
--amend
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 8, 2022
1 parent cf164a9 commit 14af57d
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,9 @@ mod tests {
pin::Pin,
task::{Context, Poll},
};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use tokio_test::{
assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task::spawn,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio_test::{assert_pending, assert_ready, task::spawn};
use vector_buffers::topology::channel::BufferReceiver;
use vector_buffers::topology::channel::{BufferSender, SenderAdapter};
use vector_buffers::{topology::builder::TopologyBuilder, WhenFull};

Expand Down Expand Up @@ -283,15 +281,13 @@ mod tests {

#[tokio::test]
async fn fanout_writes_to_all() {
let (fanout, _, receivers) = fanout_from_senders(&[2, 2]).await;
let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]).await;
let events = make_events(2);

stream::iter(events.clone())
.map(Ok)
.forward(fanout)
fanout
.send_all(events.clone())
.await
.expect("forward should not fail");

.expect("send_all should not fail");
for receiver in receivers {
assert_eq!(collect_ready(receiver), events);
}
Expand All @@ -303,13 +299,13 @@ mod tests {
let events = make_events(2);

// First send should immediately complete because all senders have capacity:
let mut first_send = spawn(async { fanout.send(events[0].clone()).await });
let mut first_send = spawn(async { fanout.send_all(vec![events[0].clone()]).await });
let first_send_result = assert_ready!(first_send.poll());
assert!(first_send_result.is_ok());
drop(first_send);

// Second send should return pending because sender B is now full:
let mut second_send = spawn(async { fanout.send(events[1].clone()).await });
let mut second_send = spawn(async { fanout.send_all(vec![events[1].clone()]).await });
assert_pending!(second_send.poll());

// Now read an item from each receiver to free up capacity for the second sender:
Expand All @@ -335,11 +331,11 @@ mod tests {

// Send in the first two events to our initial two senders:
fanout
.send(events[0].clone())
.send_all(vec![events[0].clone()])
.await
.expect("send should not fail");
fanout
.send(events[1].clone())
.send_all(vec![events[1].clone()])
.await
.expect("send should not fail");

Expand All @@ -348,7 +344,7 @@ mod tests {

// Send in the last event which all three senders will now get:
fanout
.send(events[2].clone())
.send_all(vec![events[2].clone()])
.await
.expect("send should not fail");

Expand All @@ -367,11 +363,11 @@ mod tests {

// Send in the first two events to our initial two senders:
fanout
.send(events[0].clone())
.send_all(vec![events[0].clone()])
.await
.expect("send should not fail");
fanout
.send(events[1].clone())
.send_all(vec![events[1].clone()])
.await
.expect("send should not fail");

Expand All @@ -380,7 +376,7 @@ mod tests {

// Send in the last event which only the first sender will get:
fanout
.send(events[2].clone())
.send_all(vec![events[2].clone()])
.await
.expect("send should not fail");

Expand All @@ -402,13 +398,13 @@ mod tests {
let events = make_events(2);

// First send should immediately complete because all senders have capacity:
let mut first_send = spawn(async { fanout.send(events[0].clone()).await });
let mut first_send = spawn(async { fanout.send_all(vec![events[0].clone()]).await });
let first_send_result = assert_ready!(first_send.poll());
assert!(first_send_result.is_ok());
drop(first_send);

// Second send should return pending because sender B is now full:
let mut second_send = spawn(async { fanout.send(events[1].clone()).await });
let mut second_send = spawn(async { fanout.send_all(vec![events[1].clone()]).await });
assert_pending!(second_send.poll());

// Now read an item from each receiver to free up capacity:
Expand Down Expand Up @@ -444,11 +440,11 @@ mod tests {
let events = make_events(2);

fanout
.send(events[0].clone())
.send_all(vec![events[0].clone()])
.await
.expect("send should not fail");
fanout
.send(events[1].clone())
.send_all(vec![events[1].clone()])
.await
.expect("send should not fail");
}
Expand All @@ -460,11 +456,11 @@ mod tests {

// First two sends should immediately complete because all senders have capacity:
fanout
.send(events[0].clone())
.send_all(vec![events[0].clone()])
.await
.expect("send should not fail");
fanout
.send(events[1].clone())
.send_all(vec![events[1].clone()])
.await
.expect("send should not fail");

Expand All @@ -473,7 +469,7 @@ mod tests {

// And do the third send which should also complete since all senders still have capacity:
fanout
.send(events[2].clone())
.send_all(vec![events[2].clone()])
.await
.expect("send should not fail");

Expand All @@ -495,11 +491,11 @@ mod tests {

// First two sends should immediately complete because all senders have capacity:
fanout
.send(events[0].clone())
.send_all(vec![events[0].clone()])
.await
.expect("send should not fail");
fanout
.send(events[1].clone())
.send_all(vec![events[1].clone()])
.await
.expect("send should not fail");

Expand All @@ -510,7 +506,7 @@ mod tests {
start_sender_replace(&control, &mut receivers, 0, 4).await;

// Third send should return pending because now we have an in-flight replacement:
let mut third_send = spawn(async { fanout.send(events[2].clone()).await });
let mut third_send = spawn(async { fanout.send_all(vec![events[2].clone()]).await });
assert_pending!(third_send.poll());

// Finish our sender replacement, which should wake up the third send and allow it to
Expand Down Expand Up @@ -591,8 +587,8 @@ mod tests {
// Spawn a task to send the events into the `Fanout`. We spawn a task so that we can await
// the receivers while the forward task drives itself to completion:
let events = make_events(3);
let send = stream::iter(events.clone()).map(Ok).forward(fanout);
tokio::spawn(send);
let items = events.clone();
tokio::spawn(async move { fanout.send_all(items).await });

// Wait for all of our receivers for non-erroring-senders to complete, and make sure they
// got all of the events we sent in. We also spawn these as tasks so they can empty
Expand Down Expand Up @@ -650,4 +646,10 @@ mod tests {
})
}
}

fn make_events(count: usize) -> Vec<Event> {
(0..count)
.map(|i| Event::from(format!("line {}", i)))
.collect()
}
}

0 comments on commit 14af57d

Please sign in to comment.