Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 7, 2022
1 parent a5800bb commit b167744
Showing 1 changed file with 33 additions and 45 deletions.
78 changes: 33 additions & 45 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ impl Fanout {
pub async fn send_all(&mut self, events: Vec<Event>) -> Result<(), ()> {
self.process_control_messages().await;

if self.sinks.len() == 1 {
if self.sinks.is_empty() {
// nothing, intentionally
} else if self.sinks.len() == 1 {
if let Some(sink) = &mut self.sinks[0].1 {
for event in events {
sink.feed(event).await.unwrap();
Expand All @@ -67,7 +69,7 @@ impl Fanout {
let events: Vec<Event> = clone_army.pop().unwrap();
jobs.push(async move {
for event in events {
sink.feed(event).await.unwrap();
sink.feed(event).await?;
}
sink.flush().await
});
Expand Down Expand Up @@ -242,20 +244,10 @@ impl Fanout {

#[cfg(test)]
mod tests {
use futures::{stream, FutureExt, Sink, SinkExt, StreamExt};
use futures::StreamExt;
use pretty_assertions::assert_eq;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::time::{sleep, Duration};
use vector_buffers::{
topology::{
builder::TopologyBuilder,
channel::{BufferSender, SenderAdapter},
},
WhenFull,
};
use vector_buffers::{topology::builder::TopologyBuilder, WhenFull};

use super::{ControlMessage, Fanout};
use crate::{config::ComponentKey, event::Event, test_util::collect_ready};
Expand All @@ -277,11 +269,9 @@ mod tests {
fanout.add(ComponentKey::from("b"), Box::pin(tx_b));

let recs = make_events(2);
let mut items = stream::iter(recs.clone());
let items = recs.clone();
tokio::spawn(async move {
while let Some(event) = items.next().await {
fanout.send(event).await.unwrap();
}
fanout.send_all(items).await.unwrap();
})
.await
.unwrap();
Expand All @@ -303,11 +293,9 @@ mod tests {
fanout.add(ComponentKey::from("c"), Box::pin(tx_c));

let recs = make_events(3);
let mut items = stream::iter(recs.clone());
let items = recs.clone();
tokio::spawn(async move {
while let Some(event) = items.next().await {
fanout.send(event).await.unwrap();
}
fanout.send_all(items).await.unwrap();
});

sleep(Duration::from_millis(50)).await;
Expand All @@ -334,13 +322,15 @@ mod tests {

let recs = make_events(3);

fanout.send(recs[0].clone()).await.unwrap();
fanout.send(recs[1].clone()).await.unwrap();
fanout
.send_all(vec![recs[0].clone(), recs[1].clone()])
.await
.unwrap();

let (tx_c, rx_c) = TopologyBuilder::memory(4, WhenFull::Block).await;
fanout.add(ComponentKey::from("c"), Box::pin(tx_c));

fanout.send(recs[2].clone()).await.unwrap();
fanout.send_all(vec![recs[2].clone()]).await.unwrap();

assert_eq!(collect_ready(rx_a), recs);
assert_eq!(collect_ready(rx_b), recs);
Expand All @@ -359,14 +349,16 @@ mod tests {

let recs = make_events(3);

fanout.send(recs[0].clone()).await.unwrap();
fanout.send(recs[1].clone()).await.unwrap();
fanout
.send_all(vec![recs[0].clone(), recs[1].clone()])
.await
.unwrap();

fanout_control
.send(ControlMessage::Remove(ComponentKey::from("b")))
.unwrap();

fanout.send(recs[2].clone()).await.unwrap();
fanout.send_all(vec![recs[2].clone()]).await.unwrap();

assert_eq!(collect_ready(rx_a), recs);
assert_eq!(collect_ready(rx_b), &recs[..2]);
Expand All @@ -383,13 +375,12 @@ mod tests {
fanout.add(ComponentKey::from("a"), Box::pin(tx_a));
fanout.add(ComponentKey::from("b"), Box::pin(tx_b));
fanout.add(ComponentKey::from("c"), Box::pin(tx_c));
// HERE

let recs = make_events(3);
let mut items = stream::iter(recs.clone());
let items = recs.clone();
tokio::spawn(async move {
while let Some(event) = items.next().await {
fanout.send(event).await.unwrap();
}
fanout.send_all(items).await.unwrap();
});

sleep(Duration::from_millis(50)).await;
Expand Down Expand Up @@ -420,11 +411,9 @@ mod tests {
fanout.add(ComponentKey::from("c"), Box::pin(tx_c));

let recs = make_events(3);
let mut items = stream::iter(recs.clone());
let items = recs.clone();
tokio::spawn(async move {
while let Some(event) = items.next().await {
fanout.send(event).await.unwrap();
}
fanout.send_all(items).await.unwrap();
});

sleep(Duration::from_millis(50)).await;
Expand Down Expand Up @@ -455,11 +444,9 @@ mod tests {
fanout.add(ComponentKey::from("c"), Box::pin(tx_c));

let recs = make_events(3);
let mut items = stream::iter(recs.clone());
let items = recs.clone();
tokio::spawn(async move {
while let Some(event) = items.next().await {
fanout.send(event).await.unwrap();
}
fanout.send_all(items).await.unwrap();
});

sleep(Duration::from_millis(50)).await;
Expand All @@ -484,8 +471,7 @@ mod tests {

let recs = make_events(2);

fanout.send(recs[0].clone()).await.unwrap();
fanout.send(recs[1].clone()).await.unwrap();
fanout.send_all(recs.clone()).await.unwrap();
}

#[tokio::test]
Expand All @@ -500,13 +486,15 @@ mod tests {

let recs = make_events(3);

fanout.send(recs[0].clone()).await.unwrap();
fanout.send(recs[1].clone()).await.unwrap();
fanout
.send_all(vec![recs[0].clone(), recs[1].clone()])
.await
.unwrap();

let (tx_a2, rx_a2) = TopologyBuilder::memory(4, WhenFull::Block).await;
fanout.replace(&ComponentKey::from("a"), Some(Box::pin(tx_a2)));

fanout.send(recs[2].clone()).await.unwrap();
fanout.send_all(vec![recs[2].clone()]).await.unwrap();

assert_eq!(collect_ready(rx_a1), &recs[..2]);
assert_eq!(collect_ready(rx_b), recs);
Expand Down

0 comments on commit b167744

Please sign in to comment.