Skip to content

Commit

Permalink
avoid excess clone
Browse files Browse the repository at this point in the history
Signed-off-by: Brian L. Troutwine <brian@troutwine.us>
  • Loading branch information
blt committed Feb 7, 2022
1 parent db36820 commit d26acd5
Showing 1 changed file with 28 additions and 15 deletions.
43 changes: 28 additions & 15 deletions lib/vector-core/src/fanout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,39 @@ impl Fanout {
}

// TODO make a real error
// TODO we are cloning Event one more time than needed, dropping it
pub async fn send_all(&mut self, events: Vec<Event>) -> Result<(), ()> {
self.process_control_messages().await;

let mut jobs = FuturesUnordered::new();

for (_, ms) in self.sinks.iter_mut() {
if let Some(sink) = ms.as_mut() {
let events = events.clone();
jobs.push(async move {
for event in events {
sink.feed(event).await.unwrap();
}
sink.flush().await
});
if self.sinks.len() == 1 {
if let Some(sink) = &mut self.sinks[0].1 {
for event in events {
sink.feed(event).await.unwrap();
}
sink.flush().await.unwrap();
}
} else {
let mut jobs = FuturesUnordered::new();
let mut clone_army: Vec<Vec<Event>> = Vec::with_capacity(self.sinks.len());
for _ in 0..(self.sinks.len() - 1) {
clone_army.push(events.clone());
}
clone_army.push(events);

for (_, ms) in self.sinks.iter_mut() {
if let Some(sink) = ms.as_mut() {
let events: Vec<Event> = clone_army.pop().unwrap();
jobs.push(async move {
for event in events {
sink.feed(event).await.unwrap();
}
sink.flush().await
});
}
}
}

while let Some(res) = jobs.next().await {
res.unwrap();
while let Some(res) = jobs.next().await {
res.unwrap();
}
}

Ok(())
Expand Down

0 comments on commit d26acd5

Please sign in to comment.