From 8df7026f52f9cc3b4d4a279ac57a2ec7eda442e3 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Tue, 10 Nov 2020 21:08:33 -0500 Subject: [PATCH 1/3] [span processor] Add force_flush method. See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/sdk.md#forceflush for more details. --- opentelemetry/src/sdk/trace/span_processor.rs | 99 +++++++++++++------ opentelemetry/src/testing/trace.rs | 35 ++++++- 2 files changed, 103 insertions(+), 31 deletions(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 839743058a..4bf53c163d 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -70,8 +70,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// already set). This method is called synchronously within the `Span::end` /// API, therefore it should not block or throw an exception. fn on_end(&self, span: SpanData); + /// Force the spans lying in the cache to be exported. + fn force_flush(&self); /// Shuts down the processor. Called when SDK is shut down. This is an - /// opportunity for processor to do any cleanup required. + /// opportunity for processors to do any cleanup required. fn shutdown(&mut self); } @@ -123,6 +125,11 @@ impl SpanProcessor for SimpleSpanProcessor { } } + fn force_flush(&self) { + // Ignored since all span in Simple Processor will be exported as they ended. + } + + fn shutdown(&mut self) { if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); @@ -174,7 +181,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: Mutex>, - worker_handle: Option + Send + Sync>>>, + worker_handle: Option + Send + Sync>>>, } impl fmt::Debug for BatchSpanProcessor { @@ -196,6 +203,12 @@ impl SpanProcessor for BatchSpanProcessor { } } + fn force_flush(&self) { + if let Ok(mut sender) = self.message_sender.lock() { + let _ = sender.try_send(BatchMessage::Flush); + } + } + fn shutdown(&mut self) { if let Ok(mut sender) = self.message_sender.lock() { // Send shutdown message to worker future @@ -212,7 +225,7 @@ impl SpanProcessor for BatchSpanProcessor { #[derive(Debug)] enum BatchMessage { ExportSpan(SpanData), - Tick, + Flush, Shutdown, } @@ -223,14 +236,14 @@ impl BatchSpanProcessor { interval: I, config: BatchConfig, ) -> Self - where - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); - let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Tick); + let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush); // Spawn worker process via user-defined spawn function. let worker_handle = spawn(Box::pin(async move { @@ -245,8 +258,8 @@ impl BatchSpanProcessor { spans.push(span); } } - // Span batch interval time reached, export current spans. - BatchMessage::Tick => { + // Span batch interval time reached or a force flush has been invoked, export current spans. + BatchMessage::Flush => { while !spans.is_empty() { let batch = spans.split_off( spans.len().saturating_sub(config.max_export_batch_size), @@ -272,7 +285,7 @@ impl BatchSpanProcessor { } } })) - .map(|_| ()); + .map(|_| ()); // Return batch processor with link to worker BatchSpanProcessor { @@ -287,11 +300,11 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, { BatchSpanProcessorBuilder { exporter, @@ -313,11 +326,11 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, { let mut config = BatchConfig::default(); let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) @@ -394,12 +407,12 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder -where - E: SpanExporter + 'static, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + E: SpanExporter + 'static, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -450,8 +463,10 @@ mod tests { OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; use crate::exporter::trace::stdout; - use crate::testing::trace::{new_test_export_span_data, new_test_exporter}; + use crate::testing::trace::{new_test_export_span_data, new_test_exporter, new_tokio_test_exporter}; use std::time; + use crate::sdk::trace::BatchConfig; + use tokio::time::Duration; #[test] fn simple_span_processor_on_end_calls_export() { @@ -500,4 +515,30 @@ mod tests { assert_eq!(builder.config.max_export_batch_size, 120); assert_eq!(builder.config.max_queue_size, 120); } + + #[tokio::test] + async fn test_batch_span_processor() { + let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); + let mut config = BatchConfig::default(); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported vis force_flush + let processor = BatchSpanProcessor::new( + Box::new(exporter), + tokio::spawn, + tokio::time::interval, + config, + ); + let handle = tokio::spawn(async move{ + loop { + if let Some(span) = export_receiver.recv().await { + assert_eq!(span.span_context, new_test_export_span_data().span_context); + break; + } + } + }); + tokio::time::delay_for(Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + + assert!(tokio::time::timeout(Duration::from_secs(5), handle).await.is_ok()); + } } diff --git a/opentelemetry/src/testing/trace.rs b/opentelemetry/src/testing/trace.rs index 874d0ce76d..1c33105bd6 100644 --- a/opentelemetry/src/testing/trace.rs +++ b/opentelemetry/src/testing/trace.rs @@ -10,6 +10,7 @@ use crate::{ use async_trait::async_trait; use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::SystemTime; +use crate::exporter::trace::SpanData; #[derive(Debug)] pub struct TestSpan(pub SpanContext); @@ -20,8 +21,7 @@ impl Span for TestSpan { _name: String, _timestamp: std::time::SystemTime, _attributes: Vec, - ) { - } + ) {} fn span_context(&self) -> &SpanContext { &self.0 } @@ -82,3 +82,34 @@ pub fn new_test_exporter() -> (TestSpanExporter, Receiver, R }; (exporter, rx_export, rx_shutdown) } + +#[derive(Debug)] +pub struct TokioSpanExporter { + tx_export: tokio::sync::mpsc::UnboundedSender, + tx_shutdown: tokio::sync::mpsc::UnboundedSender<()>, +} + +#[async_trait] +impl SpanExporter for TokioSpanExporter { + async fn export(&mut self, batch: Vec) -> ExportResult { + for span_data in batch { + self.tx_export.send(span_data)?; + } + Ok(()) + } + + fn shutdown(&mut self) { + self.tx_shutdown.send(()).unwrap(); + } +} + +pub fn new_tokio_test_exporter() -> (TokioSpanExporter, tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedReceiver<()>) { + let (tx_export, rx_export) = tokio::sync::mpsc::unbounded_channel(); + let (tx_shutdown, rx_shutdown) = tokio::sync::mpsc::unbounded_channel(); + let exporter = TokioSpanExporter{ + tx_export, + tx_shutdown, + }; + (exporter, rx_export, rx_shutdown) +} + From 925b59f1d43dcafe487fe99a67df6ca4b1460259 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Tue, 10 Nov 2020 21:11:17 -0500 Subject: [PATCH 2/3] (fix) docs and format. --- opentelemetry/src/sdk/trace/span_processor.rs | 64 ++++++++++--------- opentelemetry/src/testing/trace.rs | 14 ++-- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 4bf53c163d..39d6e7452f 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -126,10 +126,9 @@ impl SpanProcessor for SimpleSpanProcessor { } fn force_flush(&self) { - // Ignored since all span in Simple Processor will be exported as they ended. + // Ignored since all spans in Simple Processor will be exported as they ended. } - fn shutdown(&mut self) { if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); @@ -181,7 +180,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: Mutex>, - worker_handle: Option + Send + Sync>>>, + worker_handle: Option + Send + Sync>>>, } impl fmt::Debug for BatchSpanProcessor { @@ -236,11 +235,11 @@ impl BatchSpanProcessor { interval: I, config: BatchConfig, ) -> Self - where - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush); @@ -285,7 +284,7 @@ impl BatchSpanProcessor { } } })) - .map(|_| ()); + .map(|_| ()); // Return batch processor with link to worker BatchSpanProcessor { @@ -300,11 +299,11 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, { BatchSpanProcessorBuilder { exporter, @@ -326,11 +325,11 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, { let mut config = BatchConfig::default(); let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) @@ -407,12 +406,12 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder - where - E: SpanExporter + 'static, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, +where + E: SpanExporter + 'static, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -463,9 +462,11 @@ mod tests { OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; use crate::exporter::trace::stdout; - use crate::testing::trace::{new_test_export_span_data, new_test_exporter, new_tokio_test_exporter}; - use std::time; use crate::sdk::trace::BatchConfig; + use crate::testing::trace::{ + new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, + }; + use std::time; use tokio::time::Duration; #[test] @@ -527,7 +528,7 @@ mod tests { tokio::time::interval, config, ); - let handle = tokio::spawn(async move{ + let handle = tokio::spawn(async move { loop { if let Some(span) = export_receiver.recv().await { assert_eq!(span.span_context, new_test_export_span_data().span_context); @@ -539,6 +540,11 @@ mod tests { processor.on_end(new_test_export_span_data()); processor.force_flush(); - assert!(tokio::time::timeout(Duration::from_secs(5), handle).await.is_ok()); + assert!( + tokio::time::timeout(Duration::from_secs(5), handle) + .await + .is_ok(), + "timed out in 5 seconds. force_flush may not export any data when called" + ); } } diff --git a/opentelemetry/src/testing/trace.rs b/opentelemetry/src/testing/trace.rs index 1c33105bd6..da86e7f065 100644 --- a/opentelemetry/src/testing/trace.rs +++ b/opentelemetry/src/testing/trace.rs @@ -1,3 +1,4 @@ +use crate::exporter::trace::SpanData; use crate::{ exporter::trace::{self as exporter, ExportResult, SpanExporter}, sdk::{ @@ -10,7 +11,6 @@ use crate::{ use async_trait::async_trait; use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::SystemTime; -use crate::exporter::trace::SpanData; #[derive(Debug)] pub struct TestSpan(pub SpanContext); @@ -21,7 +21,8 @@ impl Span for TestSpan { _name: String, _timestamp: std::time::SystemTime, _attributes: Vec, - ) {} + ) { + } fn span_context(&self) -> &SpanContext { &self.0 } @@ -103,13 +104,16 @@ impl SpanExporter for TokioSpanExporter { } } -pub fn new_tokio_test_exporter() -> (TokioSpanExporter, tokio::sync::mpsc::UnboundedReceiver, tokio::sync::mpsc::UnboundedReceiver<()>) { +pub fn new_tokio_test_exporter() -> ( + TokioSpanExporter, + tokio::sync::mpsc::UnboundedReceiver, + tokio::sync::mpsc::UnboundedReceiver<()>, +) { let (tx_export, rx_export) = tokio::sync::mpsc::unbounded_channel(); let (tx_shutdown, rx_shutdown) = tokio::sync::mpsc::unbounded_channel(); - let exporter = TokioSpanExporter{ + let exporter = TokioSpanExporter { tx_export, tx_shutdown, }; (exporter, rx_export, rx_shutdown) } - From e99171266a10b8c410be87a2b2b2f584caf9e0ad Mon Sep 17 00:00:00 2001 From: Zhongyang Wu Date: Wed, 11 Nov 2020 12:20:48 -0500 Subject: [PATCH 3/3] Update opentelemetry/src/sdk/trace/span_processor.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jan Kühle --- opentelemetry/src/sdk/trace/span_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 39d6e7452f..1d136fe62e 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -521,7 +521,7 @@ mod tests { async fn test_batch_span_processor() { let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); let mut config = BatchConfig::default(); - config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported vis force_flush + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush let processor = BatchSpanProcessor::new( Box::new(exporter), tokio::spawn,