Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 7, 2022
1 parent 7f1ebb4 commit 6a6765c
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 157 deletions.
326 changes: 172 additions & 154 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,45 @@ impl Fanout {
} else if self.sinks.len() == 1 {
if let Some(sink) = &mut self.sinks[0].1 {
for event in events {
sink.feed(event).await.unwrap();
sink.feed(event).await?;
}
sink.flush().await.unwrap();
sink.flush().await?;
}
} else {
let mut jobs = FuturesUnordered::new();
let mut clone_army: Vec<Vec<Event>> = Vec::with_capacity(self.sinks.len());
for _ in 0..(self.sinks.len() - 1) {
clone_army.push(events.clone());
}
clone_army.push(events);

for (_, ms) in self.sinks.iter_mut() {
if let Some(sink) = ms.as_mut() {
let events: Vec<Event> = clone_army.pop().unwrap();
jobs.push(async move {
for event in events {
sink.feed(event).await?;
let mut faulty_sinks = Vec::with_capacity(0);

{
let mut jobs = FuturesUnordered::new();
let mut clone_army: Vec<Vec<Event>> = Vec::with_capacity(self.sinks.len());
for _ in 0..(self.sinks.len() - 1) {
clone_army.push(events.clone());
}
clone_army.push(events);

for (id, ms) in self.sinks.iter_mut() {
if let Some(sink) = ms.as_mut() {
let events: Vec<Event> = clone_army.pop().unwrap();
jobs.push(async move {
for event in events {
sink.feed(event).await.map_err(|e| (id.clone(), e))?
}
sink.flush().await.map_err(|e| (id.clone(), e))
});
}
}

while let Some(res) = jobs.next().await {
match res {
Ok(()) => continue,
Err((id, ())) => {
faulty_sinks.push(id);
}
sink.flush().await
});
}
}
}

while let Some(res) = jobs.next().await {
res.unwrap();
for id in faulty_sinks.drain(..) {
self.remove(&id)
}
}

Expand Down Expand Up @@ -123,8 +136,8 @@ impl Fanout {
Ok(ControlMessage::Add(id, sink)) => self.add(id, sink),
Ok(ControlMessage::Remove(id)) => self.remove(&id),
Ok(ControlMessage::Replace(id, sink)) => self.replace(&id, sink),
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => unimplemented!(),
Err(mpsc::error::TryRecvError::Empty)
| Err(mpsc::error::TryRecvError::Disconnected) => break,
}
}
}
Expand All @@ -140,7 +153,7 @@ impl Fanout {
// if self.sinks.len() == 1 {
// Err(())
// } else {
// self.sinks.remove(index);
//
// Ok(())
// }
// }
Expand Down Expand Up @@ -244,9 +257,16 @@ impl Fanout {

#[cfg(test)]
mod tests {
use futures::StreamExt;
use futures::{Sink, StreamExt};
use futures_util::SinkExt;
use pretty_assertions::assert_eq;
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
use vector_buffers::topology::channel::{BufferSender, SenderAdapter};
use vector_buffers::{topology::builder::TopologyBuilder, WhenFull};

use super::{ControlMessage, Fanout};
Expand Down Expand Up @@ -506,162 +526,160 @@ mod tests {
assert_eq!(collect_ready(rx_a2), &recs[2..]);
}

// #[tokio::test]
// async fn fanout_wait() {
// let (tx_a1, rx_a1) = TopologyBuilder::memory(4, WhenFull::Block).await;
// let (tx_b, rx_b) = TopologyBuilder::memory(4, WhenFull::Block).await;
#[tokio::test]
async fn fanout_wait() {
let (tx_a1, rx_a1) = TopologyBuilder::memory(4, WhenFull::Block).await;
let (tx_b, rx_b) = TopologyBuilder::memory(4, WhenFull::Block).await;

// let (mut fanout, fanout_control) = Fanout::new();
let (mut fanout, fanout_control) = Fanout::new();

// fanout.add(ComponentKey::from("a"), Box::pin(tx_a1));
// fanout.add(ComponentKey::from("b"), Box::pin(tx_b));
fanout.add(ComponentKey::from("a"), Box::pin(tx_a1));
fanout.add(ComponentKey::from("b"), Box::pin(tx_b));

// let recs = make_events(3);
let recs = make_events(3);

// fanout.send(recs[0].clone()).await.unwrap();
// fanout.send(recs[1].clone()).await.unwrap();
fanout.send_all(vec![recs[0].clone()]).await.unwrap();
fanout.send_all(vec![recs[1].clone()]).await.unwrap();

// let (tx_a2, rx_a2) = TopologyBuilder::memory(4, WhenFull::Block).await;
// fanout.replace(&ComponentKey::from("a"), None);
let (tx_a2, rx_a2) = TopologyBuilder::memory(4, WhenFull::Block).await;
fanout.replace(&ComponentKey::from("a"), None);

// futures::join!(
// async {
// sleep(Duration::from_millis(100)).await;
// fanout_control
// .send(ControlMessage::Replace(
// ComponentKey::from("a"),
// Some(Box::pin(tx_a2)),
// ))
// .unwrap();
// },
// fanout.send(recs[2].clone()).map(|_| ())
// );
tokio::spawn(async move {
sleep(Duration::from_millis(100)).await;
fanout_control
.send(ControlMessage::Replace(
ComponentKey::from("a"),
Some(Box::pin(tx_a2)),
))
.unwrap();
})
.await
.unwrap();
fanout.send_all(vec![recs[2].clone()]).await.unwrap();

// assert_eq!(collect_ready(rx_a1), &recs[..2]);
// assert_eq!(collect_ready(rx_b), recs);
// assert_eq!(collect_ready(rx_a2), &recs[2..]);
// }
assert_eq!(collect_ready(rx_a1), &recs[..2]);
assert_eq!(collect_ready(rx_b), recs);
assert_eq!(collect_ready(rx_a2), &recs[2..]);
}

// #[tokio::test]
// async fn fanout_error_poll_first() {
// fanout_error(&[Some(ErrorWhen::Poll), None, None]).await;
// }
#[tokio::test]
async fn fanout_error_poll_first() {
fanout_error(&[Some(ErrorWhen::Poll), None, None]).await;
}

// #[tokio::test]
// async fn fanout_error_poll_middle() {
// fanout_error(&[None, Some(ErrorWhen::Poll), None]).await;
// }
#[tokio::test]
async fn fanout_error_poll_middle() {
fanout_error(&[None, Some(ErrorWhen::Poll), None]).await;
}

// #[tokio::test]
// async fn fanout_error_poll_last() {
// fanout_error(&[None, None, Some(ErrorWhen::Poll)]).await;
// }
#[tokio::test]
async fn fanout_error_poll_last() {
fanout_error(&[None, None, Some(ErrorWhen::Poll)]).await;
}

// #[tokio::test]
// async fn fanout_error_poll_not_middle() {
// fanout_error(&[Some(ErrorWhen::Poll), None, Some(ErrorWhen::Poll)]).await;
// }
#[tokio::test]
async fn fanout_error_poll_not_middle() {
fanout_error(&[Some(ErrorWhen::Poll), None, Some(ErrorWhen::Poll)]).await;
}

// #[tokio::test]
// async fn fanout_error_send_first() {
// fanout_error(&[Some(ErrorWhen::Send), None, None]).await;
// }
#[tokio::test]
async fn fanout_error_send_first() {
fanout_error(&[Some(ErrorWhen::Send), None, None]).await;
}

// #[tokio::test]
// async fn fanout_error_send_middle() {
// fanout_error(&[None, Some(ErrorWhen::Send), None]).await;
// }
#[tokio::test]
async fn fanout_error_send_middle() {
fanout_error(&[None, Some(ErrorWhen::Send), None]).await;
}

// #[tokio::test]
// async fn fanout_error_send_last() {
// fanout_error(&[None, None, Some(ErrorWhen::Send)]).await;
// }
#[tokio::test]
async fn fanout_error_send_last() {
fanout_error(&[None, None, Some(ErrorWhen::Send)]).await;
}

// #[tokio::test]
// async fn fanout_error_send_not_middle() {
// fanout_error(&[Some(ErrorWhen::Send), None, Some(ErrorWhen::Send)]).await;
// }
#[tokio::test]
async fn fanout_error_send_not_middle() {
fanout_error(&[Some(ErrorWhen::Send), None, Some(ErrorWhen::Send)]).await;
}

// async fn fanout_error(modes: &[Option<ErrorWhen>]) {
// let (mut fanout, _fanout_control) = Fanout::new();
// let mut rx_channels = vec![];

// for (i, mode) in modes.iter().enumerate() {
// let id = ComponentKey::from(format!("{}", i));
// if let Some(when) = *mode {
// let tx = AlwaysErrors { when };
// let tx = SenderAdapter::opaque(tx.sink_map_err(|_| ()));
// let tx = BufferSender::new(tx, WhenFull::Block);

// fanout.add(id, Box::pin(tx));
// } else {
// let (tx, rx) = TopologyBuilder::memory(0, WhenFull::Block).await;
// fanout.add(id, Box::pin(tx));
// rx_channels.push(rx);
// }
// }
async fn fanout_error(modes: &[Option<ErrorWhen>]) {
let (mut fanout, _fanout_control) = Fanout::new();
let mut rx_channels = vec![];

for (i, mode) in modes.iter().enumerate() {
let id = ComponentKey::from(format!("{}", i));
if let Some(when) = *mode {
let tx = AlwaysErrors { when };
let tx = SenderAdapter::opaque(tx.sink_map_err(|_| ()));
let tx = BufferSender::new(tx, WhenFull::Block);

fanout.add(id, Box::pin(tx));
} else {
let (tx, rx) = TopologyBuilder::memory(0, WhenFull::Block).await;
fanout.add(id, Box::pin(tx));
rx_channels.push(rx);
}
}

// let recs = make_events(3);
// let mut items = stream::iter(recs.clone());
// tokio::spawn(async move {
// while let Some(event) = items.next().await {
// fanout.send(event).await.unwrap();
// }
// });
let recs = make_events(3);
let items = recs.clone();
tokio::spawn(async move {
fanout.send_all(items).await.unwrap();
});

// sleep(Duration::from_millis(50)).await;
sleep(Duration::from_millis(50)).await;

// // Start collecting from all at once
// let collectors = rx_channels
// .into_iter()
// .map(|rx| tokio::spawn(rx.collect::<Vec<_>>()))
// .collect::<Vec<_>>();
// Start collecting from all at once
let collectors: Vec<JoinHandle<Vec<Event>>> = rx_channels
.into_iter()
.map(|rx| tokio::spawn(rx.collect::<Vec<_>>()))
.collect::<Vec<_>>();

// for collect in collectors {
// assert_eq!(collect.await.unwrap(), recs);
// }
// }
for collect in collectors {
assert_eq!(collect.await.unwrap(), recs);
}
}

// #[derive(Clone, Copy)]
// enum ErrorWhen {
// Send,
// Poll,
// }
#[derive(Clone, Copy)]
enum ErrorWhen {
Send,
Poll,
}

// #[derive(Clone)]
// struct AlwaysErrors {
// when: ErrorWhen,
// }
#[derive(Clone)]
struct AlwaysErrors {
when: ErrorWhen,
}

// impl Sink<Event> for AlwaysErrors {
// type Error = crate::Error;
impl Sink<Event> for AlwaysErrors {
type Error = crate::Error;

// fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Poll::Ready(match self.when {
// ErrorWhen::Poll => Err("Something failed".into()),
// _ => Ok(()),
// })
// }
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(match self.when {
ErrorWhen::Poll => Err("Something failed".into()),
_ => Ok(()),
})
}

// fn start_send(self: Pin<&mut Self>, _: Event) -> Result<(), Self::Error> {
// match self.when {
// ErrorWhen::Poll => Err("Something failed".into()),
// _ => Ok(()),
// }
// }
fn start_send(self: Pin<&mut Self>, _: Event) -> Result<(), Self::Error> {
match self.when {
ErrorWhen::Poll => Err("Something failed".into()),
_ => Ok(()),
}
}

// fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Poll::Ready(match self.when {
// ErrorWhen::Poll => Err("Something failed".into()),
// _ => Ok(()),
// })
// }
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(match self.when {
ErrorWhen::Poll => Err("Something failed".into()),
_ => Ok(()),
})
}

// fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Poll::Ready(match self.when {
// ErrorWhen::Poll => Err("Something failed".into()),
// _ => Ok(()),
// })
// }
// }
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(match self.when {
ErrorWhen::Poll => Err("Something failed".into()),
_ => Ok(()),
})
}
}
}

0 comments on commit 6a6765c

Please sign in to comment.