Skip to content

Commit

Permalink
fix: allow multiple pumps from a source stream
Browse files Browse the repository at this point in the history
  • Loading branch information
laysakura committed May 11, 2022
1 parent 9044d96 commit d5710e7
Showing 1 changed file with 15 additions and 21 deletions.
Expand Up @@ -48,45 +48,39 @@ impl SourceTask {
) -> Result<MetricsUpdateByTaskExecution> {
let stopwatch = WallClockStopwatch::start();

assert_eq!(
context.output_queues().len(),
1,
"source task must have 1 output queue"
);

let out_qid = context
.output_queues()
.first()
.expect("source task must have 1 output queue")
.clone();
let out_queue_metrics = self.put_row_into(out_qid, context);
let out_queue_metrics_seq = match self.collect_next(context) {
Some(row) => {
context
.output_queues()
.into_iter()
.map(|out_qid| self.put_row_into(out_qid, row.clone(), context)) // remove None metrics
.collect::<Vec<OutQueueMetricsUpdateByTask>>()
}
None => vec![],
};

let execution_time = stopwatch.stop();

let task_metrics = TaskMetricsUpdateByTask::new(context.task(), execution_time);
Ok(MetricsUpdateByTaskExecution::new(
task_metrics,
vec![],
out_queue_metrics.map_or_else(Vec::new, |o| vec![o]),
out_queue_metrics_seq,
))
}

/// # Returns
///
/// None if no row is collected from the foreign source.
fn put_row_into(
&self,
queue_id: QueueId,
row: Row,
context: &TaskContext,
) -> Option<OutQueueMetricsUpdateByTask> {
let row = self.collect_next(context)?;
) -> OutQueueMetricsUpdateByTask {
let repos = context.repos();

let out_queue_metrics = match queue_id {
match queue_id {
QueueId::Row(queue_id) => self.put_row_into_row_queue(row, queue_id, repos),
QueueId::Window(queue_id) => self.put_row_into_window_queue(row, queue_id, repos),
};
Some(out_queue_metrics)
}
}
fn put_row_into_row_queue(
&self,
Expand Down

0 comments on commit d5710e7

Please sign in to comment.