Skip to content

Commit

Permalink
remove comments, clippy ding
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 8, 2022
1 parent 34120c0 commit 8ac2b1c
Showing 1 changed file with 21 additions and 116 deletions.
137 changes: 21 additions & 116 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,24 @@ impl Fanout {
(fanout, control_tx)
}

// TODO make a real error
/// Send a full vec of Events into the Fanout
///
/// This function accepts a `Vec<Event>` and duplicates it down into the
/// Fanout's sinks. We take care to avoid the slow receiver problem as much
/// as possible but any misbehaving downstream WILL cause this function to
/// suffer high latencies. The memory use of this function is
/// `N*mem::size_of(events)`, where N is the number of downstream targets.
///
/// # Errors
///
/// Function will error with `()` in case downstream Sink operations
/// fail. This is, we recognize, not very helpful.
///
/// # Panics
///
/// This function will only panic if there are internal logic errors,
/// specifically around the size of the buffer created to house clones of
/// Vec<Event>.
pub async fn send_all(&mut self, events: Vec<Event>) -> Result<(), ()> {
self.process_control_messages().await;

Expand All @@ -67,12 +84,12 @@ impl Fanout {
}
clone_army.push(events);

for (id, ms) in self.sinks.iter_mut() {
for (id, ms) in &mut self.sinks {
if let Some(sink) = ms.as_mut() {
let events: Vec<Event> = clone_army.pop().unwrap();
jobs.push(async move {
for event in events {
sink.feed(event).await.map_err(|e| (id.clone(), e))?
sink.feed(event).await.map_err(|e| (id.clone(), e))?;
}
sink.flush().await.map_err(|e| (id.clone(), e))
});
Expand All @@ -87,7 +104,7 @@ impl Fanout {
}

for id in faulty_sinks.drain(..) {
self.remove(&id)
self.remove(&id);
}
}

Expand Down Expand Up @@ -138,120 +155,8 @@ impl Fanout {
}
}
}

// TODO we still need some way to understand and deal with errors
//
// #[inline]
// fn handle_sink_error(&mut self, index: usize) -> Result<(), ()> {
// // If there's only one sink, propagate the error to the source ASAP
// // so it stops reading from its input. If there are multiple sinks,
// // keep pushing to the non-errored ones (while the errored sink
// // triggers a more graceful shutdown).
// if self.sinks.len() == 1 {
// Err(())
// } else {
//
// Ok(())
// }
// }

// fn poll_sinks<F>(
// mut self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// poll: F,
// ) -> Poll<Result<(), ()>>
// where
// F: Fn(
// Pin<&mut (dyn Sink<Event, Error = ()> + Send)>,
// &mut Context<'_>,
// ) -> Poll<Result<(), ()>>,
// {
// self.process_control_messages(cx);

// let mut poll_result = Poll::Ready(Ok(()));

// let mut i = 0;
// while let Some((_, sink)) = self.sinks.get_mut(i) {
// if let Some(sink) = sink {
// match poll(sink.as_mut(), cx) {
// Poll::Pending => poll_result = Poll::Pending,
// Poll::Ready(Ok(())) => (),
// Poll::Ready(Err(())) => {
// self.handle_sink_error(i)?;
// continue;
// }
// }
// }
// i += 1;
// }

// poll_result
// }
}

// impl Sink<Event> for Fanout {
// type Error = ();

// fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
// let this = self.get_mut();

// this.process_control_messages(cx);

// while let Some((_, sink)) = this.sinks.get_mut(this.i) {
// match sink {
// Some(sink) => match sink.as_mut().poll_ready(cx) {
// Poll::Pending => return Poll::Pending,
// Poll::Ready(Ok(())) => this.i += 1,
// Poll::Ready(Err(())) => this.handle_sink_error(this.i)?,
// },
// // process_control_messages ended because control channel returned
// // Pending so it's fine to return Pending here since the control
// // channel will notify current task when it receives a message.
// None => return Poll::Pending,
// }
// }

// this.i = 0;

// Poll::Ready(Ok(()))
// }

// fn start_send(mut self: Pin<&mut Self>, item: Event) -> Result<(), ()> {
// let mut items = vec![item; self.sinks.len()];

// let mut i = 1;
// while let Some((_, sink)) = self.sinks.get_mut(i) {
// if let Some(sink) = sink.as_mut() {
// let item = items.pop().unwrap();
// if sink.as_mut().start_send(item).is_err() {
// self.handle_sink_error(i)?;
// continue;
// }
// }
// i += 1;
// }

// if let Some((_, sink)) = self.sinks.first_mut() {
// if let Some(sink) = sink.as_mut() {
// let item = items.pop().unwrap();
// if sink.as_mut().start_send(item).is_err() {
// self.handle_sink_error(0)?;
// }
// }
// }

// Ok(())
// }

// fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
// self.poll_sinks(cx, |sink, cx| sink.poll_flush(cx))
// }

// fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
// self.poll_sinks(cx, |sink, cx| sink.poll_close(cx))
// }
// }

#[cfg(test)]
mod tests {
use futures::{Sink, StreamExt};
Expand Down

0 comments on commit 8ac2b1c

Please sign in to comment.