From 463823c1431dad8f015ed96c53b4fbac40774af1 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Thu, 12 Nov 2020 22:10:06 -0500 Subject: [PATCH 01/10] Implement timeout for force_flush and shutdown. Note that the spec only has configuration for maximum time to export data. So the timeout is based on the export time. --- opentelemetry/Cargo.toml | 1 + opentelemetry/src/sdk/trace/span_processor.rs | 211 +++++++++++++++--- 2 files changed, 184 insertions(+), 28 deletions(-) diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index c7b9d4f804..cf1a688b9c 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -44,6 +44,7 @@ surf = { version = "2.0", default-features = false, optional = true } criterion = "0.3.1" rand_distr = "0.3.0" tokio = { version = "0.2", features = ["full"] } +async-std = { version = "1.6", features = ["unstable"]} [features] default = ["trace"] diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 1d136fe62e..89e012c21b 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -44,6 +44,11 @@ use std::pin::Pin; use std::str::FromStr; use std::sync::Mutex; use std::time; +use crate::exporter::trace::ExportResult; +use std::error::Error; +use std::fmt::Display; +use futures::pin_mut; +use futures::future::Either; /// Delay interval between two consecutive exports, default to be 5000. const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS"; @@ -57,6 +62,10 @@ const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048; const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Maximum allowed time to export data +// const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS"; +/// Default maximum allowed time to export data +const OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT: u64 = 30000; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -180,7 +189,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: Mutex>, - worker_handle: Option + Send + Sync>>>, + worker_handle: Option + Send + Sync>>>, } impl fmt::Debug for BatchSpanProcessor { @@ -228,18 +237,32 @@ enum BatchMessage { Shutdown, } +#[derive(Debug, Default)] +struct TimeoutErr {} + +impl Display for TimeoutErr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("timed out") + } +} + +impl Error for TimeoutErr {} + impl BatchSpanProcessor { - pub(crate) fn new( + pub(crate) fn new( mut exporter: Box, spawn: S, interval: I, + delay: D, config: BatchConfig, ) -> Self - where - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush); @@ -264,8 +287,15 @@ impl BatchSpanProcessor { spans.len().saturating_sub(config.max_export_batch_size), ); + let export = exporter.export(batch); + let timeout = delay(config.max_export_timeout); + pin_mut!(export); + pin_mut!(timeout); // TODO: Surface error through global error handler - let _result = exporter.export(batch).await; + let _result = match futures::future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())), + }; } } // Stream has terminated or processor is shutdown, return to finish execution. @@ -275,8 +305,15 @@ impl BatchSpanProcessor { spans.len().saturating_sub(config.max_export_batch_size), ); + let export = exporter.export(batch); + let timeout = delay(config.max_export_timeout); + pin_mut!(export); + pin_mut!(timeout); // TODO: Surface error through global error handler - let _result = exporter.export(batch).await; + let _result = match futures::future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())), + }; } exporter.shutdown(); break; @@ -284,7 +321,7 @@ impl BatchSpanProcessor { } } })) - .map(|_| ()); + .map(|_| ()); // Return batch processor with link to worker BatchSpanProcessor { @@ -299,11 +336,11 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, { BatchSpanProcessorBuilder { exporter, @@ -325,11 +362,11 @@ impl BatchSpanProcessor { spawn: S, interval: I, ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, { let mut config = BatchConfig::default(); let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) @@ -382,6 +419,9 @@ pub struct BatchConfig { /// of spans one batch after the other without any delay. The default value /// is 512. max_export_batch_size: usize, + + /// The maximum duration to export a batch of data. + max_export_timeout: time::Duration, } impl Default for BatchConfig { @@ -390,6 +430,7 @@ impl Default for BatchConfig { max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, + max_export_timeout: time::Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT), } } } @@ -406,12 +447,12 @@ pub struct BatchSpanProcessorBuilder { } impl BatchSpanProcessorBuilder -where - E: SpanExporter + 'static, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + E: SpanExporter + 'static, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -449,25 +490,32 @@ where Box::new(self.exporter), self.spawn, self.interval, + tokio::time::delay_for, self.config, ) } } #[cfg(test)] +#[allow(warnings)] mod tests { use super::{ BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS, OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; - use crate::exporter::trace::stdout; + use crate::exporter::trace::{stdout, SpanExporter, SpanData, ExportResult}; + use futures::pin_mut; use crate::sdk::trace::BatchConfig; use crate::testing::trace::{ new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, }; use std::time; - use tokio::time::Duration; + use async_trait::async_trait; + use std::time::{Instant, Duration}; + use async_std::prelude::*; + use std::ops::Sub; + use std::fmt::Debug; #[test] fn simple_span_processor_on_end_calls_export() { @@ -526,6 +574,7 @@ mod tests { Box::new(exporter), tokio::spawn, tokio::time::interval, + tokio::time::delay_for, config, ); let handle = tokio::spawn(async move { @@ -547,4 +596,110 @@ mod tests { "timed out in 5 seconds. force_flush may not export any data when called" ); } + + struct BlockingExporter { + delay_for: time::Duration, + delay_fn: D, + } + + impl Debug for BlockingExporter + where D: Fn(time::Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("blocking exporter for testing") + } + } + + #[async_trait] + impl SpanExporter for BlockingExporter + where D: Fn(time::Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static { + async fn export(&mut self, batch: Vec) -> ExportResult { + println!("Accepting {} spans", batch.len()); + (self.delay_fn)(self.delay_for).await; + println!("Finish exporting, return result from exporter"); + Ok(()) + } + } + + #[test] + fn test_timeout_std_async() { + async_std::task::block_on(async { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(5); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let mut exporter = BlockingExporter { + delay_for: time::Duration::from_secs(30), + delay_fn: async_std::task::sleep, + }; + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + async_std::task::spawn, + async_std::stream::interval, + async_std::task::sleep, + config, + ); + async_std::task::sleep(time::Duration::from_secs(1)).await; // skip the first + let start_time = Instant::now(); + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + processor.shutdown(); + let end_time = Instant::now(); + assert!(end_time.sub(start_time).as_secs() < 7); + }) + } + + #[test] + fn test_timeout_tokio() { + let mut runtime = tokio::runtime::Builder::new().threaded_scheduler().enable_all().build().unwrap(); + /// Assert time out will return if exporter takes too long + runtime.block_on(async { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(5); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let mut exporter = BlockingExporter { + delay_for: time::Duration::from_secs(30), + delay_fn: tokio::time::delay_for, + }; + let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + spawn, + tokio::time::interval, + tokio::time::delay_for, + config, + ); + let start_time = Instant::now(); + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + processor.shutdown(); + let end_time = Instant::now(); + assert!(end_time.sub(start_time).as_secs() < 7); + }) + + /// Assert exporters could return if the time spend is less than timeout. + runtime.block_on(async { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(5); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let mut exporter = BlockingExporter { + delay_for: time::Duration::from_secs(30), + delay_fn: tokio::time::delay_for, + }; + let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + spawn, + tokio::time::interval, + tokio::time::delay_for, + config, + ); + let start_time = Instant::now(); + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + processor.shutdown(); + let end_time = Instant::now(); + assert!(end_time.sub(start_time).as_secs() < 7); + }) + } } From 177f76368117d45f9fe3fe5110fb30dad8002546 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Thu, 12 Nov 2020 22:10:06 -0500 Subject: [PATCH 02/10] Implement timeout for force_flush and shutdown. Note that the spec only has configuration for maximum time to export data. So the timeout is based on the export time. --- opentelemetry/Cargo.toml | 7 +- opentelemetry/src/sdk/trace/provider.rs | 8 +- opentelemetry/src/sdk/trace/span_processor.rs | 254 +++++++++++++++--- 3 files changed, 221 insertions(+), 48 deletions(-) diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index c7b9d4f804..629e670be6 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -21,17 +21,17 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] -async-std = { version = "1.6", features = ["unstable"], default-features = false, optional = true } +async-std = { version = "1.6", features = ["unstable"], default-features = false, optional = true } async-trait = { version = "0.1", optional = true } bincode = { version = "1.2", optional = true } dashmap = { version = "4.0.0-rc6", optional = true } fnv = { version = "1.0", optional = true } -futures = "0.3" +futures = { version = "0.3", features = ["std"] } lazy_static = "1.4" percent-encoding = { version = "2.0", optional = true } pin-project = { version = "0.4", optional = true } rand = { version = "0.7", default-features = false, features = ["std"], optional = true } -regex = { version = "1.3", default-features = false, features = ["std", "perf"], optional = true} +regex = { version = "1.3", default-features = false, features = ["std", "perf"], optional = true } serde = { version = "1.0", features = ["derive", "rc"], optional = true } http = { version = "0.2", optional = true } thiserror = { version = "1.0", optional = true } @@ -44,6 +44,7 @@ surf = { version = "2.0", default-features = false, optional = true } criterion = "0.3.1" rand_distr = "0.3.0" tokio = { version = "0.2", features = ["full"] } +async-std = { version = "1.6", features = ["unstable"] } [features] default = ["trace"] diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index c7df06ca1f..de52a9523b 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -116,7 +116,12 @@ impl Builder { // drop. We cannot assume we are in a multi-threaded tokio runtime here, so use // `spawn_blocking` to avoid blocking the main thread. let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); - let batch = sdk::trace::BatchSpanProcessor::builder(exporter, spawn, tokio::time::interval); + let batch = sdk::trace::BatchSpanProcessor::builder( + exporter, + spawn, + tokio::time::delay_for, + tokio::time::interval, + ); self.with_batch_exporter(batch.build()) } @@ -127,6 +132,7 @@ impl Builder { let batch = sdk::trace::BatchSpanProcessor::builder( exporter, async_std::task::spawn, + async_std::task::sleep, async_std::stream::interval, ); self.with_batch_exporter(batch.build()) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 1d136fe62e..e32a8b1e9c 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -35,15 +35,15 @@ //! [`TracerProvider`]: ../provider/trait.TracerProvider.html use crate::sdk::trace::Span; use crate::{ - exporter::trace::{SpanData, SpanExporter}, + exporter::trace::{ExportResult, SpanData, SpanExporter}, Context, }; -use futures::{channel::mpsc, executor, future::BoxFuture, Future, FutureExt, Stream, StreamExt}; -use std::fmt; -use std::pin::Pin; -use std::str::FromStr; -use std::sync::Mutex; -use std::time; +use futures::{ + channel::mpsc, executor, future::BoxFuture, future::Either, pin_mut, Future, FutureExt, Stream, + StreamExt, +}; +use std::fmt::Display; +use std::{error::Error, fmt, pin::Pin, str::FromStr, sync::Mutex, time}; /// Delay interval between two consecutive exports, default to be 5000. const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS"; @@ -57,6 +57,10 @@ const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048; const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; +/// Maximum allowed time to export data +// const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS"; +/// Default maximum allowed time to export data +const OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT: u64 = 30000; /// `SpanProcessor` is an interface which allows hooks for span start and end /// method invocations. The span processors are invoked only when is_recording @@ -160,7 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// // Then build a batch processor. You can use whichever executor you have available, for /// // example if you are using `async-std` instead of `tokio` you can replace the spawn and /// // interval functions with `async_std::task::spawn` and `async_std::stream::interval`. -/// let batch = sdktrace::BatchSpanProcessor::builder(exporter, tokio::spawn, tokio::time::interval) +/// let batch = sdktrace::BatchSpanProcessor::builder(exporter, tokio::spawn, tokio::time::delay_for, tokio::time::interval) /// .with_max_queue_size(4096) /// .build(); /// @@ -180,7 +184,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: Mutex>, - worker_handle: Option + Send + Sync>>>, + worker_handle: Option + Send + Sync>>>, } impl fmt::Debug for BatchSpanProcessor { @@ -228,18 +232,32 @@ enum BatchMessage { Shutdown, } +#[derive(Debug, Default)] +struct TimeoutErr {} + +impl Display for TimeoutErr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("timed out") + } +} + +impl Error for TimeoutErr {} + impl BatchSpanProcessor { - pub(crate) fn new( + pub(crate) fn new( mut exporter: Box, spawn: S, interval: I, + delay: D, config: BatchConfig, ) -> Self - where - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, + where + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush); @@ -264,8 +282,7 @@ impl BatchSpanProcessor { spans.len().saturating_sub(config.max_export_batch_size), ); - // TODO: Surface error through global error handler - let _result = exporter.export(batch).await; + let _result = export_with_timeout(config.max_export_timeout, exporter.as_mut(), &delay, batch).await; } } // Stream has terminated or processor is shutdown, return to finish execution. @@ -275,8 +292,7 @@ impl BatchSpanProcessor { spans.len().saturating_sub(config.max_export_batch_size), ); - // TODO: Surface error through global error handler - let _result = exporter.export(batch).await; + let _result = export_with_timeout(config.max_export_timeout, exporter.as_mut(), &delay, batch).await; } exporter.shutdown(); break; @@ -284,7 +300,7 @@ impl BatchSpanProcessor { } } })) - .map(|_| ()); + .map(|_| ()); // Return batch processor with link to worker BatchSpanProcessor { @@ -294,21 +310,25 @@ impl BatchSpanProcessor { } /// Create a new batch processor builder - pub fn builder( + pub fn builder( exporter: E, spawn: S, + delay: D, interval: I, - ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + ) -> BatchSpanProcessorBuilder + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { BatchSpanProcessorBuilder { exporter, spawn, interval, + delay, config: Default::default(), } } @@ -320,16 +340,19 @@ impl BatchSpanProcessor { /// Note that export batch size should be less than or equals to max queue size. /// If export batch size is larger than max queue size, we will lower to be the same as max /// queue size - pub fn from_env( + pub fn from_env( exporter: E, spawn: S, interval: I, - ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + delay: D, + ) -> BatchSpanProcessorBuilder + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { let mut config = BatchConfig::default(); let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) @@ -361,11 +384,34 @@ impl BatchSpanProcessor { config, exporter, spawn, + delay, interval, } } } +async fn export_with_timeout( + time_out: time::Duration, + exporter: &mut E, + delay: &D, + batch: Vec, +) -> ExportResult + where + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, + E: SpanExporter + ?Sized, +{ + let export = exporter.export(batch); + let timeout = delay(time_out); + pin_mut!(export); + pin_mut!(timeout); + // TODO: Surface error through global error handler + match futures::future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())), + } +} + /// Batch span processor configuration #[derive(Debug)] pub struct BatchConfig { @@ -382,6 +428,9 @@ pub struct BatchConfig { /// of spans one batch after the other without any delay. The default value /// is 512. max_export_batch_size: usize, + + /// The maximum duration to export a batch of data. + max_export_timeout: time::Duration, } impl Default for BatchConfig { @@ -390,6 +439,7 @@ impl Default for BatchConfig { max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT), max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT, + max_export_timeout: time::Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT), } } } @@ -398,20 +448,23 @@ impl Default for BatchConfig { /// /// [`BatchSpanProcessor`]: struct.BatchSpanProcessor.html #[derive(Debug)] -pub struct BatchSpanProcessorBuilder { +pub struct BatchSpanProcessorBuilder { exporter: E, interval: I, spawn: S, + delay: D, config: BatchConfig, } -impl BatchSpanProcessorBuilder -where - E: SpanExporter + 'static, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, +impl BatchSpanProcessorBuilder + where + E: SpanExporter + 'static, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -429,6 +482,14 @@ where BatchSpanProcessorBuilder { config, ..self } } + /// Set max timeout for exporting. + pub fn with_max_timeout(self, timeout: time::Duration) -> Self { + let mut config = self.config; + config.max_export_timeout = timeout; + + BatchSpanProcessorBuilder { config, ..self } + } + /// Set max export size for batches, should always less than or equals to max queue size. /// /// If input is larger than max queue size, will lower it to be equal to max queue size @@ -449,6 +510,7 @@ where Box::new(self.exporter), self.spawn, self.interval, + self.delay, self.config, ) } @@ -461,13 +523,17 @@ mod tests { OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS, OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; - use crate::exporter::trace::stdout; + use crate::exporter::trace::{stdout, ExportResult, SpanData, SpanExporter}; use crate::sdk::trace::BatchConfig; use crate::testing::trace::{ new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, }; + use async_std::prelude::*; + use async_trait::async_trait; + use std::fmt::Debug; + use std::ops::Sub; use std::time; - use tokio::time::Duration; + use std::time::{Duration, Instant}; #[test] fn simple_span_processor_on_end_calls_export() { @@ -494,6 +560,7 @@ mod tests { stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, tokio::time::interval, + tokio::time::delay_for, ); // export batch size cannot exceed max queue size assert_eq!(builder.config.max_export_batch_size, 500); @@ -511,6 +578,7 @@ mod tests { stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, tokio::time::interval, + tokio::time::delay_for, ); assert_eq!(builder.config.max_export_batch_size, 120); @@ -526,6 +594,7 @@ mod tests { Box::new(exporter), tokio::spawn, tokio::time::interval, + tokio::time::delay_for, config, ); let handle = tokio::spawn(async move { @@ -547,4 +616,101 @@ mod tests { "timed out in 5 seconds. force_flush may not export any data when called" ); } + + struct BlockingExporter { + delay_for: time::Duration, + delay_fn: D, + } + + impl Debug for BlockingExporter + where + D: Fn(time::Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("blocking exporter for testing") + } + } + + #[async_trait] + impl SpanExporter for BlockingExporter + where + D: Fn(time::Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { + async fn export(&mut self, batch: Vec) -> ExportResult { + println!("Accepting {} spans", batch.len()); + (self.delay_fn)(self.delay_for).await; + println!("Finish exporting, return result from exporter"); + Ok(()) + } + } + + #[test] + fn test_timeout() { + // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. + // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. + // Either way, the test should be finished within 5s. But given the delay functions are not + // always accurate. We set the threshold to 7s. + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + runtime.block_on(timeout_test_tokio(true)); + runtime.block_on(timeout_test_tokio(false)); + + async_std::task::block_on(timeout_test_std_async(true)); + async_std::task::block_on(timeout_test_std_async(false)); + } + + // If the time_out is true, then the result suppose to ended with timeout. otherwise the exporter should be able to export within time out duration. + async fn timeout_test_std_async(time_out: bool) { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(if time_out { 5 } else { 60 }); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let exporter = BlockingExporter { + delay_for: time::Duration::from_secs(if !time_out { 5 } else { 60 }), + delay_fn: async_std::task::sleep, + }; + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + async_std::task::spawn, + async_std::stream::interval, + async_std::task::sleep, + config, + ); + let start_time = Instant::now(); + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + processor.shutdown(); + let end_time = Instant::now(); + assert!(end_time.sub(start_time).as_secs() < 7); + } + + // If the time_out is true, then the result suppose to ended with timeout. otherwise the exporter should be able to export within time out duration. + async fn timeout_test_tokio(time_out: bool) { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(if time_out { 5 } else { 60 }); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let exporter = BlockingExporter { + delay_for: time::Duration::from_secs(if !time_out { 5 } else { 60 }), + delay_fn: tokio::time::delay_for, + }; + let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + spawn, + tokio::time::interval, + tokio::time::delay_for, + config, + ); + tokio::time::delay_for(time::Duration::from_secs(1)).await; // skip the first + let start_time = Instant::now(); + processor.on_end(new_test_export_span_data()); + processor.force_flush(); + processor.shutdown(); + let end_time = Instant::now(); + assert!(end_time.sub(start_time).as_secs() < 7); + } } From 8b055929938e0f95cc52c417cea67f3fa8214e1c Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Sat, 14 Nov 2020 16:13:13 -0500 Subject: [PATCH 03/10] * Add TraceError enum in trace API. * Block force_flush until the export results returned from worker. * Return ExportResult for force_flush and shutdown. --- opentelemetry/Cargo.toml | 10 +- opentelemetry/src/api/trace/mod.rs | 11 + opentelemetry/src/exporter/trace/mod.rs | 15 +- opentelemetry/src/sdk/trace/provider.rs | 10 +- opentelemetry/src/sdk/trace/span_processor.rs | 431 ++++++++++-------- 5 files changed, 283 insertions(+), 194 deletions(-) diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index cf1a688b9c..d09b814495 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -21,17 +21,17 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [dependencies] -async-std = { version = "1.6", features = ["unstable"], default-features = false, optional = true } +async-std = { version = "1.6", features = ["unstable"], default-features = false, optional = true } async-trait = { version = "0.1", optional = true } bincode = { version = "1.2", optional = true } dashmap = { version = "4.0.0-rc6", optional = true } fnv = { version = "1.0", optional = true } -futures = "0.3" +futures = { version = "0.3", features = ["std"] } lazy_static = "1.4" percent-encoding = { version = "2.0", optional = true } pin-project = { version = "0.4", optional = true } rand = { version = "0.7", default-features = false, features = ["std"], optional = true } -regex = { version = "1.3", default-features = false, features = ["std", "perf"], optional = true} +regex = { version = "1.3", default-features = false, features = ["std", "perf"], optional = true } serde = { version = "1.0", features = ["derive", "rc"], optional = true } http = { version = "0.2", optional = true } thiserror = { version = "1.0", optional = true } @@ -44,11 +44,11 @@ surf = { version = "2.0", default-features = false, optional = true } criterion = "0.3.1" rand_distr = "0.3.0" tokio = { version = "0.2", features = ["full"] } -async-std = { version = "1.6", features = ["unstable"]} +async-std = { version = "1.6", features = ["unstable"] } [features] default = ["trace"] -trace = ["rand", "pin-project", "async-trait", "regex", "percent-encoding"] +trace = ["rand", "pin-project", "async-trait", "regex", "percent-encoding", "thiserror"] metrics = ["thiserror", "dashmap", "fnv"] serialize = ["serde", "bincode"] diff --git a/opentelemetry/src/api/trace/mod.rs b/opentelemetry/src/api/trace/mod.rs index 7eba2b1715..776fed0151 100644 --- a/opentelemetry/src/api/trace/mod.rs +++ b/opentelemetry/src/api/trace/mod.rs @@ -136,3 +136,14 @@ pub use self::{ }, tracer::{SpanBuilder, Tracer}, }; + +use thiserror::Error; + +/// Errors returned by the trace API. +#[derive(Error, Debug, PartialEq)] +#[non_exhaustive] +pub enum TraceError { + /// Other errors not covered by specific cases. + #[error("Trace error: {0}")] + Other(String), +} diff --git a/opentelemetry/src/exporter/trace/mod.rs b/opentelemetry/src/exporter/trace/mod.rs index bde2349d62..549a448c42 100644 --- a/opentelemetry/src/exporter/trace/mod.rs +++ b/opentelemetry/src/exporter/trace/mod.rs @@ -10,7 +10,8 @@ use http::Request; use serde::{Deserialize, Serialize}; #[cfg(all(feature = "http", feature = "reqwest"))] use std::convert::TryInto; -use std::fmt::Debug; +use std::error::Error; +use std::fmt::{Debug, Display}; use std::sync::Arc; use std::time::SystemTime; @@ -19,6 +20,18 @@ pub mod stdout; /// Describes the result of an export. pub type ExportResult = Result<(), Box>; +/// Timed out when exporting spans to remote +#[derive(Debug, Default)] +pub struct ExportTimedOutError {} + +impl Display for ExportTimedOutError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("export timed out") + } +} + +impl Error for ExportTimedOutError {} + /// `SpanExporter` defines the interface that protocol-specific exporters must /// implement so that they can be plugged into OpenTelemetry SDK and support /// sending of telemetry data. diff --git a/opentelemetry/src/sdk/trace/provider.rs b/opentelemetry/src/sdk/trace/provider.rs index c7df06ca1f..146316e78c 100644 --- a/opentelemetry/src/sdk/trace/provider.rs +++ b/opentelemetry/src/sdk/trace/provider.rs @@ -25,7 +25,7 @@ pub(crate) struct TracerProviderInner { impl Drop for TracerProviderInner { fn drop(&mut self) { for processor in &mut self.processors { - processor.shutdown(); + let _result = processor.shutdown(); } } } @@ -116,7 +116,12 @@ impl Builder { // drop. We cannot assume we are in a multi-threaded tokio runtime here, so use // `spawn_blocking` to avoid blocking the main thread. let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); - let batch = sdk::trace::BatchSpanProcessor::builder(exporter, spawn, tokio::time::interval); + let batch = sdk::trace::BatchSpanProcessor::builder( + exporter, + spawn, + tokio::time::delay_for, + tokio::time::interval, + ); self.with_batch_exporter(batch.build()) } @@ -127,6 +132,7 @@ impl Builder { let batch = sdk::trace::BatchSpanProcessor::builder( exporter, async_std::task::spawn, + async_std::task::sleep, async_std::stream::interval, ); self.with_batch_exporter(batch.build()) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 89e012c21b..4f256e5404 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -33,22 +33,18 @@ //! //! [`is_recording`]: ../span/trait.Span.html#method.is_recording //! [`TracerProvider`]: ../provider/trait.TracerProvider.html +use crate::api::trace::TraceError; +use crate::exporter::trace::ExportTimedOutError; use crate::sdk::trace::Span; use crate::{ - exporter::trace::{SpanData, SpanExporter}, + exporter::trace::{ExportResult, SpanData, SpanExporter}, Context, }; -use futures::{channel::mpsc, executor, future::BoxFuture, Future, FutureExt, Stream, StreamExt}; -use std::fmt; -use std::pin::Pin; -use std::str::FromStr; -use std::sync::Mutex; -use std::time; -use crate::exporter::trace::ExportResult; -use std::error::Error; -use std::fmt::Display; -use futures::pin_mut; -use futures::future::Either; +use futures::{ + channel::mpsc, channel::oneshot, executor, future::BoxFuture, future::Either, pin_mut, Future, + FutureExt, Stream, StreamExt, +}; +use std::{fmt, pin::Pin, str::FromStr, sync::Mutex, time}; /// Delay interval between two consecutive exports, default to be 5000. const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS"; @@ -80,10 +76,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// API, therefore it should not block or throw an exception. fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. - fn force_flush(&self); + fn force_flush(&self) -> ExportResult; /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. - fn shutdown(&mut self); + fn shutdown(&mut self) -> ExportResult; } /// A [`SpanProcessor`] that exports synchronously when spans are finished. @@ -134,13 +130,17 @@ impl SpanProcessor for SimpleSpanProcessor { } } - fn force_flush(&self) { + fn force_flush(&self) -> ExportResult { // Ignored since all spans in Simple Processor will be exported as they ended. + Ok(()) } - fn shutdown(&mut self) { + fn shutdown(&mut self) -> ExportResult { if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); + Ok(()) + } else { + Err(TraceError::Other("When force flushing the SimpleSpanProcessor, the exporter's lock has been positioned".into()).into()) } } } @@ -169,7 +169,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// // Then build a batch processor. You can use whichever executor you have available, for /// // example if you are using `async-std` instead of `tokio` you can replace the spawn and /// // interval functions with `async_std::task::spawn` and `async_std::stream::interval`. -/// let batch = sdktrace::BatchSpanProcessor::builder(exporter, tokio::spawn, tokio::time::interval) +/// let batch = sdktrace::BatchSpanProcessor::builder(exporter, tokio::spawn, tokio::time::delay_for, tokio::time::interval) /// .with_max_queue_size(4096) /// .build(); /// @@ -189,7 +189,7 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: Mutex>, - worker_handle: Option + Send + Sync>>>, + _worker_handle: Option + Send + Sync>>>, } impl fmt::Debug for BatchSpanProcessor { @@ -211,43 +211,38 @@ impl SpanProcessor for BatchSpanProcessor { } } - fn force_flush(&self) { - if let Ok(mut sender) = self.message_sender.lock() { - let _ = sender.try_send(BatchMessage::Flush); + fn force_flush(&self) -> ExportResult { + let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been positioned".into()))?; + let (res_sender, res_receiver) = oneshot::channel::>(); + sender.try_send(BatchMessage::Flush(Some(res_sender)))?; + for result in futures::executor::block_on(res_receiver)? { + if result.is_err() { + return result; + } } + Ok(()) } - fn shutdown(&mut self) { - if let Ok(mut sender) = self.message_sender.lock() { - // Send shutdown message to worker future - if sender.try_send(BatchMessage::Shutdown).is_ok() { - if let Some(worker_handle) = self.worker_handle.take() { - // Block waiting for worker to shut down if sending was successful - futures::executor::block_on(worker_handle) - } + fn shutdown(&mut self) -> ExportResult { + let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been positioned".into()))?; + let (res_sender, res_receiver) = oneshot::channel::>(); + sender.try_send(BatchMessage::Shutdown(res_sender))?; + for result in futures::executor::block_on(res_receiver)? { + if result.is_err() { + return result; } } + Ok(()) } } #[derive(Debug)] enum BatchMessage { ExportSpan(SpanData), - Flush, - Shutdown, -} - -#[derive(Debug, Default)] -struct TimeoutErr {} - -impl Display for TimeoutErr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("timed out") - } + Flush(Option>>), + Shutdown(oneshot::Sender>), } -impl Error for TimeoutErr {} - impl BatchSpanProcessor { pub(crate) fn new( mut exporter: Box, @@ -256,16 +251,16 @@ impl BatchSpanProcessor { delay: D, config: BatchConfig, ) -> Self - where - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, - D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, - DS: Future + 'static + Send + Sync, + where + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size); - let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush); + let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush(None)); // Spawn worker process via user-defined spawn function. let worker_handle = spawn(Box::pin(async move { @@ -281,71 +276,98 @@ impl BatchSpanProcessor { } } // Span batch interval time reached or a force flush has been invoked, export current spans. - BatchMessage::Flush => { + BatchMessage::Flush(Some(ch)) => { + let mut results = Vec::with_capacity( + spans.len() / config.max_export_batch_size + 1 as usize, + ); while !spans.is_empty() { let batch = spans.split_off( spans.len().saturating_sub(config.max_export_batch_size), ); - let export = exporter.export(batch); - let timeout = delay(config.max_export_timeout); - pin_mut!(export); - pin_mut!(timeout); - // TODO: Surface error through global error handler - let _result = match futures::future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())), - }; + results.push( + export_with_timeout( + config.max_export_timeout, + exporter.as_mut(), + &delay, + batch, + ) + .await, + ); + } + let _send_result = ch.send(results); + } + BatchMessage::Flush(None) => { + while !spans.is_empty() { + let batch = spans.split_off( + spans.len().saturating_sub(config.max_export_batch_size), + ); + + let _result = export_with_timeout( + config.max_export_timeout, + exporter.as_mut(), + &delay, + batch, + ) + .await; } } // Stream has terminated or processor is shutdown, return to finish execution. - BatchMessage::Shutdown => { + BatchMessage::Shutdown(ch) => { + let mut results = Vec::with_capacity( + spans.len() / config.max_export_batch_size + 1 as usize, + ); while !spans.is_empty() { let batch = spans.split_off( spans.len().saturating_sub(config.max_export_batch_size), ); - let export = exporter.export(batch); - let timeout = delay(config.max_export_timeout); - pin_mut!(export); - pin_mut!(timeout); - // TODO: Surface error through global error handler - let _result = match futures::future::select(export, timeout).await { - Either::Left((export_res, _)) => export_res, - Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())), - }; + results.push( + export_with_timeout( + config.max_export_timeout, + exporter.as_mut(), + &delay, + batch, + ) + .await, + ); } exporter.shutdown(); + let _send_result = ch.send(results); break; } } } })) - .map(|_| ()); + .map(|_| ()); // Return batch processor with link to worker BatchSpanProcessor { message_sender: Mutex::new(message_sender), - worker_handle: Some(Box::pin(worker_handle)), + _worker_handle: Some(Box::pin(worker_handle)), } } /// Create a new batch processor builder - pub fn builder( + pub fn builder( exporter: E, spawn: S, + delay: D, interval: I, - ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + ) -> BatchSpanProcessorBuilder + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { BatchSpanProcessorBuilder { exporter, spawn, interval, + delay, config: Default::default(), } } @@ -357,16 +379,19 @@ impl BatchSpanProcessor { /// Note that export batch size should be less than or equals to max queue size. /// If export batch size is larger than max queue size, we will lower to be the same as max /// queue size - pub fn from_env( + pub fn from_env( exporter: E, spawn: S, interval: I, - ) -> BatchSpanProcessorBuilder - where - E: SpanExporter, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IO, + delay: D, + ) -> BatchSpanProcessorBuilder + where + E: SpanExporter, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IO, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { let mut config = BatchConfig::default(); let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS) @@ -398,11 +423,34 @@ impl BatchSpanProcessor { config, exporter, spawn, + delay, interval, } } } +async fn export_with_timeout( + time_out: time::Duration, + exporter: &mut E, + delay: &D, + batch: Vec, +) -> ExportResult +where + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, + E: SpanExporter + ?Sized, +{ + let export = exporter.export(batch); + let timeout = delay(time_out); + pin_mut!(export); + pin_mut!(timeout); + // TODO: Surface error through global error handler + match futures::future::select(export, timeout).await { + Either::Left((export_res, _)) => export_res, + Either::Right((_, _)) => ExportResult::Err(Box::new(ExportTimedOutError::default())), + } +} + /// Batch span processor configuration #[derive(Debug)] pub struct BatchConfig { @@ -439,20 +487,23 @@ impl Default for BatchConfig { /// /// [`BatchSpanProcessor`]: struct.BatchSpanProcessor.html #[derive(Debug)] -pub struct BatchSpanProcessorBuilder { +pub struct BatchSpanProcessorBuilder { exporter: E, interval: I, spawn: S, + delay: D, config: BatchConfig, } -impl BatchSpanProcessorBuilder - where - E: SpanExporter + 'static, - S: Fn(BoxFuture<'static, ()>) -> SH, - SH: Future + Send + Sync + 'static, - I: Fn(time::Duration) -> IS, - IS: Stream + Send + 'static, +impl BatchSpanProcessorBuilder +where + E: SpanExporter + 'static, + S: Fn(BoxFuture<'static, ()>) -> SH, + SH: Future + Send + Sync + 'static, + I: Fn(time::Duration) -> IS, + IS: Stream + Send + 'static, + D: (Fn(time::Duration) -> DS) + Send + Sync + 'static, + DS: Future + 'static + Send + Sync, { /// Set max queue size for batches pub fn with_max_queue_size(self, size: usize) -> Self { @@ -470,6 +521,14 @@ impl BatchSpanProcessorBuilder BatchSpanProcessorBuilder { config, ..self } } + /// Set max timeout for exporting. + pub fn with_max_timeout(self, timeout: time::Duration) -> Self { + let mut config = self.config; + config.max_export_timeout = timeout; + + BatchSpanProcessorBuilder { config, ..self } + } + /// Set max export size for batches, should always less than or equals to max queue size. /// /// If input is larger than max queue size, will lower it to be equal to max queue size @@ -490,32 +549,29 @@ impl BatchSpanProcessorBuilder Box::new(self.exporter), self.spawn, self.interval, - tokio::time::delay_for, + self.delay, self.config, ) } } #[cfg(test)] -#[allow(warnings)] mod tests { use super::{ BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS, OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; - use crate::exporter::trace::{stdout, SpanExporter, SpanData, ExportResult}; - use futures::pin_mut; + use crate::exporter::trace::{stdout, ExportResult, SpanData, SpanExporter}; use crate::sdk::trace::BatchConfig; use crate::testing::trace::{ new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, }; - use std::time; - use async_trait::async_trait; - use std::time::{Instant, Duration}; use async_std::prelude::*; - use std::ops::Sub; + use async_trait::async_trait; use std::fmt::Debug; + use std::time; + use std::time::Duration; #[test] fn simple_span_processor_on_end_calls_export() { @@ -529,7 +585,7 @@ mod tests { fn simple_span_processor_shutdown_calls_shutdown() { let (exporter, _rx_export, rx_shutdown) = new_test_exporter(); let mut processor = SimpleSpanProcessor::new(Box::new(exporter)); - processor.shutdown(); + let _result = processor.shutdown(); assert!(rx_shutdown.try_recv().is_ok()); } @@ -542,6 +598,7 @@ mod tests { stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, tokio::time::interval, + tokio::time::delay_for, ); // export batch size cannot exceed max queue size assert_eq!(builder.config.max_export_batch_size, 500); @@ -559,6 +616,7 @@ mod tests { stdout::Exporter::new(std::io::stdout(), true), tokio::spawn, tokio::time::interval, + tokio::time::delay_for, ); assert_eq!(builder.config.max_export_batch_size, 120); @@ -570,9 +628,10 @@ mod tests { let (exporter, mut export_receiver, _shutdown_receiver) = new_tokio_test_exporter(); let mut config = BatchConfig::default(); config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush - let processor = BatchSpanProcessor::new( + let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); + let mut processor = BatchSpanProcessor::new( Box::new(exporter), - tokio::spawn, + spawn, tokio::time::interval, tokio::time::delay_for, config, @@ -587,7 +646,9 @@ mod tests { }); tokio::time::delay_for(Duration::from_secs(1)).await; // skip the first processor.on_end(new_test_export_span_data()); - processor.force_flush(); + let flush_res = processor.force_flush(); + assert!(flush_res.is_ok()); + let _shutdown_result = processor.shutdown(); assert!( tokio::time::timeout(Duration::from_secs(5), handle) @@ -603,8 +664,10 @@ mod tests { } impl Debug for BlockingExporter - where D: Fn(time::Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static { + where + D: Fn(time::Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.write_str("blocking exporter for testing") } @@ -612,8 +675,10 @@ mod tests { #[async_trait] impl SpanExporter for BlockingExporter - where D: Fn(time::Duration) -> DS + 'static + Send + Sync, - DS: Future + Send + Sync + 'static { + where + D: Fn(time::Duration) -> DS + 'static + Send + Sync, + DS: Future + Send + Sync + 'static, + { async fn export(&mut self, batch: Vec) -> ExportResult { println!("Accepting {} spans", batch.len()); (self.delay_fn)(self.delay_for).await; @@ -623,83 +688,77 @@ mod tests { } #[test] - fn test_timeout_std_async() { - async_std::task::block_on(async { - let mut config = BatchConfig::default(); - config.max_export_timeout = time::Duration::from_secs(5); - config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush - let mut exporter = BlockingExporter { - delay_for: time::Duration::from_secs(30), - delay_fn: async_std::task::sleep, - }; - let mut processor = BatchSpanProcessor::new( - Box::new(exporter), - async_std::task::spawn, - async_std::stream::interval, - async_std::task::sleep, - config, - ); - async_std::task::sleep(time::Duration::from_secs(1)).await; // skip the first - let start_time = Instant::now(); - processor.on_end(new_test_export_span_data()); - processor.force_flush(); - processor.shutdown(); - let end_time = Instant::now(); - assert!(end_time.sub(start_time).as_secs() < 7); - }) + fn test_timeout() { + // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. + // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. + // Either way, the test should be finished within 5s. + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); + runtime.block_on(timeout_test_tokio(true)); + runtime.block_on(timeout_test_tokio(false)); + + async_std::task::block_on(timeout_test_std_async(true)); + async_std::task::block_on(timeout_test_std_async(false)); } - #[test] - fn test_timeout_tokio() { - let mut runtime = tokio::runtime::Builder::new().threaded_scheduler().enable_all().build().unwrap(); - /// Assert time out will return if exporter takes too long - runtime.block_on(async { - let mut config = BatchConfig::default(); - config.max_export_timeout = time::Duration::from_secs(5); - config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush - let mut exporter = BlockingExporter { - delay_for: time::Duration::from_secs(30), - delay_fn: tokio::time::delay_for, - }; - let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); - let mut processor = BatchSpanProcessor::new( - Box::new(exporter), - spawn, - tokio::time::interval, - tokio::time::delay_for, - config, - ); - let start_time = Instant::now(); - processor.on_end(new_test_export_span_data()); - processor.force_flush(); - processor.shutdown(); - let end_time = Instant::now(); - assert!(end_time.sub(start_time).as_secs() < 7); - }) - - /// Assert exporters could return if the time spend is less than timeout. - runtime.block_on(async { - let mut config = BatchConfig::default(); - config.max_export_timeout = time::Duration::from_secs(5); - config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush - let mut exporter = BlockingExporter { - delay_for: time::Duration::from_secs(30), - delay_fn: tokio::time::delay_for, - }; - let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); - let mut processor = BatchSpanProcessor::new( - Box::new(exporter), - spawn, - tokio::time::interval, - tokio::time::delay_for, - config, - ); - let start_time = Instant::now(); - processor.on_end(new_test_export_span_data()); - processor.force_flush(); - processor.shutdown(); - let end_time = Instant::now(); - assert!(end_time.sub(start_time).as_secs() < 7); - }) + // If the time_out is true, then the result suppose to ended with timeout. + // otherwise the exporter should be able to export within time out duration. + async fn timeout_test_std_async(time_out: bool) { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(if time_out { 5 } else { 60 }); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let exporter = BlockingExporter { + delay_for: time::Duration::from_secs(if !time_out { 5 } else { 60 }), + delay_fn: async_std::task::sleep, + }; + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + async_std::task::spawn, + async_std::stream::interval, + async_std::task::sleep, + config, + ); + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + if time_out { + assert!(flush_res.is_err()); + } else { + assert!(flush_res.is_ok()); + } + let shutdown_res = processor.shutdown(); + assert!(shutdown_res.is_ok()); + } + + // If the time_out is true, then the result suppose to ended with timeout. + // otherwise the exporter should be able to export within time out duration. + async fn timeout_test_tokio(time_out: bool) { + let mut config = BatchConfig::default(); + config.max_export_timeout = time::Duration::from_secs(if time_out { 5 } else { 60 }); + config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush + let exporter = BlockingExporter { + delay_for: time::Duration::from_secs(if !time_out { 5 } else { 60 }), + delay_fn: tokio::time::delay_for, + }; + let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); + let mut processor = BatchSpanProcessor::new( + Box::new(exporter), + spawn, + tokio::time::interval, + tokio::time::delay_for, + config, + ); + tokio::time::delay_for(time::Duration::from_secs(1)).await; // skip the first + processor.on_end(new_test_export_span_data()); + let flush_res = processor.force_flush(); + if time_out { + assert!(flush_res.is_err()); + } else { + assert!(flush_res.is_ok()); + } + let shutdown_res = processor.shutdown(); + assert!(shutdown_res.is_ok()); } } From 417ff29d744e799a1c82b479cc0f4efec8e9060a Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Sat, 14 Nov 2020 20:49:35 -0500 Subject: [PATCH 04/10] Allow users to set export time out from env var. --- opentelemetry/src/sdk/trace/span_processor.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 4f256e5404..542ce3b6cd 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -59,7 +59,7 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"; /// Default maximum batch size const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512; /// Maximum allowed time to export data -// const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS"; +const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS"; /// Default maximum allowed time to export data const OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT: u64 = 30000; @@ -419,6 +419,13 @@ impl BatchSpanProcessor { config.max_export_batch_size = max_export_batch_size; } + let max_export_time_out = std::env::var(OTEL_BSP_EXPORT_TIMEOUT_MILLIS) + .map(|timeout| { + u64::from_str(&timeout).unwrap_or(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT) + }) + .unwrap_or(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT); + config.max_export_timeout = time::Duration::from_millis(max_export_time_out); + BatchSpanProcessorBuilder { config, exporter, @@ -563,6 +570,7 @@ mod tests { OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT, }; use crate::exporter::trace::{stdout, ExportResult, SpanData, SpanExporter}; + use crate::sdk::trace::span_processor::OTEL_BSP_EXPORT_TIMEOUT_MILLIS; use crate::sdk::trace::BatchConfig; use crate::testing::trace::{ new_test_export_span_data, new_test_exporter, new_tokio_test_exporter, @@ -592,6 +600,7 @@ mod tests { #[test] fn test_build_batch_span_processor_from_env() { std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500"); + std::env::set_var(OTEL_BSP_EXPORT_TIMEOUT_MILLIS, "2046"); std::env::set_var(OTEL_BSP_SCHEDULE_DELAY_MILLIS, "I am not number"); let mut builder = BatchSpanProcessor::from_env( @@ -610,6 +619,10 @@ mod tests { builder.config.max_queue_size, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT ); + assert_eq!( + builder.config.max_export_timeout, + time::Duration::from_millis(2046) + ); std::env::set_var(OTEL_BSP_MAX_QUEUE_SIZE, "120"); builder = BatchSpanProcessor::from_env( From 27da8196cb86191b4d5bb1f37094e941a6fa1dd1 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Tue, 17 Nov 2020 20:31:31 -0500 Subject: [PATCH 05/10] Address comments. --- opentelemetry/src/sdk/trace/span_processor.rs | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 542ce3b6cd..50d232b4da 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -76,10 +76,10 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// API, therefore it should not block or throw an exception. fn on_end(&self, span: SpanData); /// Force the spans lying in the cache to be exported. - fn force_flush(&self) -> ExportResult; + fn force_flush(&self) -> Result<(), Box>; /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. - fn shutdown(&mut self) -> ExportResult; + fn shutdown(&mut self) -> Result<(), Box>; } /// A [`SpanProcessor`] that exports synchronously when spans are finished. @@ -130,17 +130,17 @@ impl SpanProcessor for SimpleSpanProcessor { } } - fn force_flush(&self) -> ExportResult { + fn force_flush(&self) -> Result<(), Box> { // Ignored since all spans in Simple Processor will be exported as they ended. Ok(()) } - fn shutdown(&mut self) -> ExportResult { + fn shutdown(&mut self) -> Result<(), Box> { if let Ok(mut exporter) = self.exporter.lock() { exporter.shutdown(); Ok(()) } else { - Err(TraceError::Other("When force flushing the SimpleSpanProcessor, the exporter's lock has been positioned".into()).into()) + Err(TraceError::Other("When force flushing the SimpleSpanProcessor, the exporter's lock has been poisoned".into()).into()) } } } @@ -211,8 +211,8 @@ impl SpanProcessor for BatchSpanProcessor { } } - fn force_flush(&self) -> ExportResult { - let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been positioned".into()))?; + fn force_flush(&self) -> Result<(), Box> { + let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned".into()))?; let (res_sender, res_receiver) = oneshot::channel::>(); sender.try_send(BatchMessage::Flush(Some(res_sender)))?; for result in futures::executor::block_on(res_receiver)? { @@ -223,8 +223,8 @@ impl SpanProcessor for BatchSpanProcessor { Ok(()) } - fn shutdown(&mut self) -> ExportResult { - let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been positioned".into()))?; + fn shutdown(&mut self) -> Result<(), Box> { + let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned".into()))?; let (res_sender, res_receiver) = oneshot::channel::>(); sender.try_send(BatchMessage::Shutdown(res_sender))?; for result in futures::executor::block_on(res_receiver)? { @@ -295,6 +295,7 @@ impl BatchSpanProcessor { .await, ); } + // TODO: Surface error through global error handler let _send_result = ch.send(results); } BatchMessage::Flush(None) => { @@ -333,6 +334,7 @@ impl BatchSpanProcessor { ); } exporter.shutdown(); + // TODO: Surface error through global error handler let _send_result = ch.send(results); break; } @@ -451,7 +453,6 @@ where let timeout = delay(time_out); pin_mut!(export); pin_mut!(timeout); - // TODO: Surface error through global error handler match futures::future::select(export, timeout).await { Either::Left((export_res, _)) => export_res, Either::Right((_, _)) => ExportResult::Err(Box::new(ExportTimedOutError::default())), @@ -701,7 +702,7 @@ mod tests { } #[test] - fn test_timeout() { + fn test_timeout_tokio_timeout() { // If time_out is true, then we ask exporter to block for 60s and set timeout to 5s. // If time_out is false, then we ask the exporter to block for 5s and set timeout to 60s. // Either way, the test should be finished within 5s. @@ -711,9 +712,25 @@ mod tests { .build() .unwrap(); runtime.block_on(timeout_test_tokio(true)); + } + + #[test] + fn test_timeout_tokio_not_timeout() { + let mut runtime = tokio::runtime::Builder::new() + .threaded_scheduler() + .enable_all() + .build() + .unwrap(); runtime.block_on(timeout_test_tokio(false)); + } + #[test] + fn test_timeout_async_std_timeout() { async_std::task::block_on(timeout_test_std_async(true)); + } + + #[test] + fn test_timeout_async_std_not_timeout() { async_std::task::block_on(timeout_test_std_async(false)); } From e6c438ab8bb9c933fe10d1dc0f9d0046edc71064 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Tue, 17 Nov 2020 20:36:27 -0500 Subject: [PATCH 06/10] Address comments. --- opentelemetry/src/sdk/trace/span_processor.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 50d232b4da..08249b0c6a 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -738,10 +738,10 @@ mod tests { // otherwise the exporter should be able to export within time out duration. async fn timeout_test_std_async(time_out: bool) { let mut config = BatchConfig::default(); - config.max_export_timeout = time::Duration::from_secs(if time_out { 5 } else { 60 }); + config.max_export_timeout = time::Duration::from_millis(if time_out { 5 } else { 60 }); config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush let exporter = BlockingExporter { - delay_for: time::Duration::from_secs(if !time_out { 5 } else { 60 }), + delay_for: time::Duration::from_millis(if !time_out { 5 } else { 60 }), delay_fn: async_std::task::sleep, }; let mut processor = BatchSpanProcessor::new( @@ -766,10 +766,10 @@ mod tests { // otherwise the exporter should be able to export within time out duration. async fn timeout_test_tokio(time_out: bool) { let mut config = BatchConfig::default(); - config.max_export_timeout = time::Duration::from_secs(if time_out { 5 } else { 60 }); + config.max_export_timeout = time::Duration::from_millis(if time_out { 5 } else { 60 }); config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush let exporter = BlockingExporter { - delay_for: time::Duration::from_secs(if !time_out { 5 } else { 60 }), + delay_for: time::Duration::from_millis(if !time_out { 5 } else { 60 }), delay_fn: tokio::time::delay_for, }; let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut)); From 2c59fcf4cc76bb86dbefd12892fa4fc4f9a1e913 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Wed, 18 Nov 2020 19:01:50 -0500 Subject: [PATCH 07/10] remove unnecessary `as usize` --- opentelemetry/src/sdk/trace/span_processor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 08249b0c6a..2c3fedc879 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -278,7 +278,7 @@ impl BatchSpanProcessor { // Span batch interval time reached or a force flush has been invoked, export current spans. BatchMessage::Flush(Some(ch)) => { let mut results = Vec::with_capacity( - spans.len() / config.max_export_batch_size + 1 as usize, + spans.len() / config.max_export_batch_size + 1, ); while !spans.is_empty() { let batch = spans.split_off( @@ -316,7 +316,7 @@ impl BatchSpanProcessor { // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { let mut results = Vec::with_capacity( - spans.len() / config.max_export_batch_size + 1 as usize, + spans.len() / config.max_export_batch_size + 1, ); while !spans.is_empty() { let batch = spans.split_off( From d2a69cad38feb546cd4d2a425f8bb4e32b7adb05 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Wed, 18 Nov 2020 20:10:08 -0500 Subject: [PATCH 08/10] format --- opentelemetry/src/sdk/trace/span_processor.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 2c3fedc879..0810bfd1aa 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -277,9 +277,8 @@ impl BatchSpanProcessor { } // Span batch interval time reached or a force flush has been invoked, export current spans. BatchMessage::Flush(Some(ch)) => { - let mut results = Vec::with_capacity( - spans.len() / config.max_export_batch_size + 1, - ); + let mut results = + Vec::with_capacity(spans.len() / config.max_export_batch_size + 1); while !spans.is_empty() { let batch = spans.split_off( spans.len().saturating_sub(config.max_export_batch_size), @@ -315,9 +314,8 @@ impl BatchSpanProcessor { } // Stream has terminated or processor is shutdown, return to finish execution. BatchMessage::Shutdown(ch) => { - let mut results = Vec::with_capacity( - spans.len() / config.max_export_batch_size + 1, - ); + let mut results = + Vec::with_capacity(spans.len() / config.max_export_batch_size + 1); while !spans.is_empty() { let batch = spans.split_off( spans.len().saturating_sub(config.max_export_batch_size), From 992abb8a9b7c116597a5e605f07ced9879dd6aca Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Wed, 18 Nov 2020 20:30:32 -0500 Subject: [PATCH 09/10] Address comments. --- opentelemetry-otlp/src/proto/opentelemetry-proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-otlp/src/proto/opentelemetry-proto b/opentelemetry-otlp/src/proto/opentelemetry-proto index 59c488bfb8..8ab21e9da6 160000 --- a/opentelemetry-otlp/src/proto/opentelemetry-proto +++ b/opentelemetry-otlp/src/proto/opentelemetry-proto @@ -1 +1 @@ -Subproject commit 59c488bfb8fb6d0458ad6425758b70259ff4a2bd +Subproject commit 8ab21e9da6246e465cd9d50d405561aedef31a1e From 1cc920d5372704ecf03e754f87cdac5cbf63e6a9 Mon Sep 17 00:00:00 2001 From: "zhongyang.wu" Date: Wed, 18 Nov 2020 20:52:39 -0500 Subject: [PATCH 10/10] Address comments. --- opentelemetry-otlp/src/proto/metrics.rs | 936 ++++++++++++++++-- opentelemetry/Cargo.toml | 2 +- opentelemetry/src/api/trace/mod.rs | 4 +- opentelemetry/src/sdk/trace/span_processor.rs | 14 +- 4 files changed, 882 insertions(+), 74 deletions(-) diff --git a/opentelemetry-otlp/src/proto/metrics.rs b/opentelemetry-otlp/src/proto/metrics.rs index ef3674e38d..039cb4793b 100644 --- a/opentelemetry-otlp/src/proto/metrics.rs +++ b/opentelemetry-otlp/src/proto/metrics.rs @@ -506,6 +506,7 @@ pub enum Metric_oneof_data { double_sum(DoubleSum), int_histogram(IntHistogram), double_histogram(DoubleHistogram), + double_summary(DoubleSummary), } impl Metric { @@ -884,6 +885,55 @@ impl Metric { DoubleHistogram::new() } } + + // .opentelemetry.proto.metrics.v1.DoubleSummary double_summary = 11; + + + pub fn get_double_summary(&self) -> &DoubleSummary { + match self.data { + ::std::option::Option::Some(Metric_oneof_data::double_summary(ref v)) => v, + _ => ::default_instance(), + } + } + pub fn clear_double_summary(&mut self) { + self.data = ::std::option::Option::None; + } + + pub fn has_double_summary(&self) -> bool { + match self.data { + ::std::option::Option::Some(Metric_oneof_data::double_summary(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_double_summary(&mut self, v: DoubleSummary) { + self.data = ::std::option::Option::Some(Metric_oneof_data::double_summary(v)) + } + + // Mutable pointer to the field. + pub fn mut_double_summary(&mut self) -> &mut DoubleSummary { + if let ::std::option::Option::Some(Metric_oneof_data::double_summary(_)) = self.data { + } else { + self.data = ::std::option::Option::Some(Metric_oneof_data::double_summary(DoubleSummary::new())); + } + match self.data { + ::std::option::Option::Some(Metric_oneof_data::double_summary(ref mut v)) => v, + _ => panic!(), + } + } + + // Take field + pub fn take_double_summary(&mut self) -> DoubleSummary { + if self.has_double_summary() { + match self.data.take() { + ::std::option::Option::Some(Metric_oneof_data::double_summary(v)) => v, + _ => panic!(), + } + } else { + DoubleSummary::new() + } + } } impl ::protobuf::Message for Metric { @@ -918,6 +968,11 @@ impl ::protobuf::Message for Metric { return false; } } + if let Some(Metric_oneof_data::double_summary(ref v)) = self.data { + if !v.is_initialized() { + return false; + } + } true } @@ -970,6 +1025,12 @@ impl ::protobuf::Message for Metric { } self.data = ::std::option::Option::Some(Metric_oneof_data::double_histogram(is.read_message()?)); }, + 11 => { + if wire_type != ::protobuf::wire_format::WireTypeLengthDelimited { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + self.data = ::std::option::Option::Some(Metric_oneof_data::double_summary(is.read_message()?)); + }, _ => { ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; }, @@ -1017,6 +1078,10 @@ impl ::protobuf::Message for Metric { let len = v.compute_size(); my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; }, + &Metric_oneof_data::double_summary(ref v) => { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }, }; } my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); @@ -1066,6 +1131,11 @@ impl ::protobuf::Message for Metric { os.write_raw_varint32(v.get_cached_size())?; v.write_to_with_cached_sizes(os)?; }, + &Metric_oneof_data::double_summary(ref v) => { + os.write_tag(11, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }, }; } os.write_unknown_fields(self.get_unknown_fields())?; @@ -1151,6 +1221,11 @@ impl ::protobuf::Message for Metric { Metric::has_double_histogram, Metric::get_double_histogram, )); + fields.push(::protobuf::reflect::accessor::make_singular_message_accessor::<_, DoubleSummary>( + "double_summary", + Metric::has_double_summary, + Metric::get_double_summary, + )); ::protobuf::reflect::MessageDescriptor::new_pb_name::( "Metric", fields, @@ -1176,6 +1251,7 @@ impl ::protobuf::Clear for Metric { self.data = ::std::option::Option::None; self.data = ::std::option::Option::None; self.data = ::std::option::Option::None; + self.data = ::std::option::Option::None; self.unknown_fields.clear(); } } @@ -2400,6 +2476,175 @@ impl ::protobuf::reflect::ProtobufValue for DoubleHistogram { } } +#[derive(PartialEq,Clone,Default)] +#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] +pub struct DoubleSummary { + // message fields + pub data_points: ::protobuf::RepeatedField, + // special fields + #[cfg_attr(feature = "with-serde", serde(skip))] + pub unknown_fields: ::protobuf::UnknownFields, + #[cfg_attr(feature = "with-serde", serde(skip))] + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a DoubleSummary { + fn default() -> &'a DoubleSummary { + ::default_instance() + } +} + +impl DoubleSummary { + pub fn new() -> DoubleSummary { + ::std::default::Default::default() + } + + // repeated .opentelemetry.proto.metrics.v1.DoubleSummaryDataPoint data_points = 1; + + + pub fn get_data_points(&self) -> &[DoubleSummaryDataPoint] { + &self.data_points + } + pub fn clear_data_points(&mut self) { + self.data_points.clear(); + } + + // Param is passed by value, moved + pub fn set_data_points(&mut self, v: ::protobuf::RepeatedField) { + self.data_points = v; + } + + // Mutable pointer to the field. + pub fn mut_data_points(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.data_points + } + + // Take field + pub fn take_data_points(&mut self) -> ::protobuf::RepeatedField { + ::std::mem::replace(&mut self.data_points, ::protobuf::RepeatedField::new()) + } +} + +impl ::protobuf::Message for DoubleSummary { + fn is_initialized(&self) -> bool { + for v in &self.data_points { + if !v.is_initialized() { + return false; + } + }; + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.data_points)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + for value in &self.data_points { + let len = value.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }; + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + for v in &self.data_points { + os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }; + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> DoubleSummary { + DoubleSummary::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "data_points", + |m: &DoubleSummary| { &m.data_points }, + |m: &mut DoubleSummary| { &mut m.data_points }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "DoubleSummary", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static DoubleSummary { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(DoubleSummary::new) + } +} + +impl ::protobuf::Clear for DoubleSummary { + fn clear(&mut self) { + self.data_points.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for DoubleSummary { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for DoubleSummary { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] pub struct IntDataPoint { @@ -3918,6 +4163,554 @@ impl ::protobuf::reflect::ProtobufValue for DoubleHistogramDataPoint { } } +#[derive(PartialEq,Clone,Default)] +#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] +pub struct DoubleSummaryDataPoint { + // message fields + pub labels: ::protobuf::RepeatedField, + pub start_time_unix_nano: u64, + pub time_unix_nano: u64, + pub count: u64, + pub sum: f64, + pub quantile_values: ::protobuf::RepeatedField, + // special fields + #[cfg_attr(feature = "with-serde", serde(skip))] + pub unknown_fields: ::protobuf::UnknownFields, + #[cfg_attr(feature = "with-serde", serde(skip))] + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a DoubleSummaryDataPoint { + fn default() -> &'a DoubleSummaryDataPoint { + ::default_instance() + } +} + +impl DoubleSummaryDataPoint { + pub fn new() -> DoubleSummaryDataPoint { + ::std::default::Default::default() + } + + // repeated .opentelemetry.proto.common.v1.StringKeyValue labels = 1; + + + pub fn get_labels(&self) -> &[super::common::StringKeyValue] { + &self.labels + } + pub fn clear_labels(&mut self) { + self.labels.clear(); + } + + // Param is passed by value, moved + pub fn set_labels(&mut self, v: ::protobuf::RepeatedField) { + self.labels = v; + } + + // Mutable pointer to the field. + pub fn mut_labels(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.labels + } + + // Take field + pub fn take_labels(&mut self) -> ::protobuf::RepeatedField { + ::std::mem::replace(&mut self.labels, ::protobuf::RepeatedField::new()) + } + + // fixed64 start_time_unix_nano = 2; + + + pub fn get_start_time_unix_nano(&self) -> u64 { + self.start_time_unix_nano + } + pub fn clear_start_time_unix_nano(&mut self) { + self.start_time_unix_nano = 0; + } + + // Param is passed by value, moved + pub fn set_start_time_unix_nano(&mut self, v: u64) { + self.start_time_unix_nano = v; + } + + // fixed64 time_unix_nano = 3; + + + pub fn get_time_unix_nano(&self) -> u64 { + self.time_unix_nano + } + pub fn clear_time_unix_nano(&mut self) { + self.time_unix_nano = 0; + } + + // Param is passed by value, moved + pub fn set_time_unix_nano(&mut self, v: u64) { + self.time_unix_nano = v; + } + + // fixed64 count = 4; + + + pub fn get_count(&self) -> u64 { + self.count + } + pub fn clear_count(&mut self) { + self.count = 0; + } + + // Param is passed by value, moved + pub fn set_count(&mut self, v: u64) { + self.count = v; + } + + // double sum = 5; + + + pub fn get_sum(&self) -> f64 { + self.sum + } + pub fn clear_sum(&mut self) { + self.sum = 0.; + } + + // Param is passed by value, moved + pub fn set_sum(&mut self, v: f64) { + self.sum = v; + } + + // repeated .opentelemetry.proto.metrics.v1.DoubleSummaryDataPoint.ValueAtQuantile quantile_values = 6; + + + pub fn get_quantile_values(&self) -> &[DoubleSummaryDataPoint_ValueAtQuantile] { + &self.quantile_values + } + pub fn clear_quantile_values(&mut self) { + self.quantile_values.clear(); + } + + // Param is passed by value, moved + pub fn set_quantile_values(&mut self, v: ::protobuf::RepeatedField) { + self.quantile_values = v; + } + + // Mutable pointer to the field. + pub fn mut_quantile_values(&mut self) -> &mut ::protobuf::RepeatedField { + &mut self.quantile_values + } + + // Take field + pub fn take_quantile_values(&mut self) -> ::protobuf::RepeatedField { + ::std::mem::replace(&mut self.quantile_values, ::protobuf::RepeatedField::new()) + } +} + +impl ::protobuf::Message for DoubleSummaryDataPoint { + fn is_initialized(&self) -> bool { + for v in &self.labels { + if !v.is_initialized() { + return false; + } + }; + for v in &self.quantile_values { + if !v.is_initialized() { + return false; + } + }; + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + ::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.labels)?; + }, + 2 => { + if wire_type != ::protobuf::wire_format::WireTypeFixed64 { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_fixed64()?; + self.start_time_unix_nano = tmp; + }, + 3 => { + if wire_type != ::protobuf::wire_format::WireTypeFixed64 { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_fixed64()?; + self.time_unix_nano = tmp; + }, + 4 => { + if wire_type != ::protobuf::wire_format::WireTypeFixed64 { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_fixed64()?; + self.count = tmp; + }, + 5 => { + if wire_type != ::protobuf::wire_format::WireTypeFixed64 { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_double()?; + self.sum = tmp; + }, + 6 => { + ::protobuf::rt::read_repeated_message_into(wire_type, is, &mut self.quantile_values)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + for value in &self.labels { + let len = value.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }; + if self.start_time_unix_nano != 0 { + my_size += 9; + } + if self.time_unix_nano != 0 { + my_size += 9; + } + if self.count != 0 { + my_size += 9; + } + if self.sum != 0. { + my_size += 9; + } + for value in &self.quantile_values { + let len = value.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint32_size(len) + len; + }; + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + for v in &self.labels { + os.write_tag(1, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }; + if self.start_time_unix_nano != 0 { + os.write_fixed64(2, self.start_time_unix_nano)?; + } + if self.time_unix_nano != 0 { + os.write_fixed64(3, self.time_unix_nano)?; + } + if self.count != 0 { + os.write_fixed64(4, self.count)?; + } + if self.sum != 0. { + os.write_double(5, self.sum)?; + } + for v in &self.quantile_values { + os.write_tag(6, ::protobuf::wire_format::WireTypeLengthDelimited)?; + os.write_raw_varint32(v.get_cached_size())?; + v.write_to_with_cached_sizes(os)?; + }; + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> DoubleSummaryDataPoint { + DoubleSummaryDataPoint::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "labels", + |m: &DoubleSummaryDataPoint| { &m.labels }, + |m: &mut DoubleSummaryDataPoint| { &mut m.labels }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeFixed64>( + "start_time_unix_nano", + |m: &DoubleSummaryDataPoint| { &m.start_time_unix_nano }, + |m: &mut DoubleSummaryDataPoint| { &mut m.start_time_unix_nano }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeFixed64>( + "time_unix_nano", + |m: &DoubleSummaryDataPoint| { &m.time_unix_nano }, + |m: &mut DoubleSummaryDataPoint| { &mut m.time_unix_nano }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeFixed64>( + "count", + |m: &DoubleSummaryDataPoint| { &m.count }, + |m: &mut DoubleSummaryDataPoint| { &mut m.count }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeDouble>( + "sum", + |m: &DoubleSummaryDataPoint| { &m.sum }, + |m: &mut DoubleSummaryDataPoint| { &mut m.sum }, + )); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeMessage>( + "quantile_values", + |m: &DoubleSummaryDataPoint| { &m.quantile_values }, + |m: &mut DoubleSummaryDataPoint| { &mut m.quantile_values }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "DoubleSummaryDataPoint", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static DoubleSummaryDataPoint { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(DoubleSummaryDataPoint::new) + } +} + +impl ::protobuf::Clear for DoubleSummaryDataPoint { + fn clear(&mut self) { + self.labels.clear(); + self.start_time_unix_nano = 0; + self.time_unix_nano = 0; + self.count = 0; + self.sum = 0.; + self.quantile_values.clear(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for DoubleSummaryDataPoint { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for DoubleSummaryDataPoint { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + +#[derive(PartialEq,Clone,Default)] +#[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] +pub struct DoubleSummaryDataPoint_ValueAtQuantile { + // message fields + pub quantile: f64, + pub value: f64, + // special fields + #[cfg_attr(feature = "with-serde", serde(skip))] + pub unknown_fields: ::protobuf::UnknownFields, + #[cfg_attr(feature = "with-serde", serde(skip))] + pub cached_size: ::protobuf::CachedSize, +} + +impl<'a> ::std::default::Default for &'a DoubleSummaryDataPoint_ValueAtQuantile { + fn default() -> &'a DoubleSummaryDataPoint_ValueAtQuantile { + ::default_instance() + } +} + +impl DoubleSummaryDataPoint_ValueAtQuantile { + pub fn new() -> DoubleSummaryDataPoint_ValueAtQuantile { + ::std::default::Default::default() + } + + // double quantile = 1; + + + pub fn get_quantile(&self) -> f64 { + self.quantile + } + pub fn clear_quantile(&mut self) { + self.quantile = 0.; + } + + // Param is passed by value, moved + pub fn set_quantile(&mut self, v: f64) { + self.quantile = v; + } + + // double value = 2; + + + pub fn get_value(&self) -> f64 { + self.value + } + pub fn clear_value(&mut self) { + self.value = 0.; + } + + // Param is passed by value, moved + pub fn set_value(&mut self, v: f64) { + self.value = v; + } +} + +impl ::protobuf::Message for DoubleSummaryDataPoint_ValueAtQuantile { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 1 => { + if wire_type != ::protobuf::wire_format::WireTypeFixed64 { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_double()?; + self.quantile = tmp; + }, + 2 => { + if wire_type != ::protobuf::wire_format::WireTypeFixed64 { + return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type)); + } + let tmp = is.read_double()?; + self.value = tmp; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if self.quantile != 0. { + my_size += 9; + } + if self.value != 0. { + my_size += 9; + } + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> { + if self.quantile != 0. { + os.write_double(1, self.quantile)?; + } + if self.value != 0. { + os.write_double(2, self.value)?; + } + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &dyn (::std::any::Any) { + self as &dyn (::std::any::Any) + } + fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) { + self as &mut dyn (::std::any::Any) + } + fn into_any(self: ::std::boxed::Box) -> ::std::boxed::Box { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + Self::descriptor_static() + } + + fn new() -> DoubleSummaryDataPoint_ValueAtQuantile { + DoubleSummaryDataPoint_ValueAtQuantile::new() + } + + fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT; + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeDouble>( + "quantile", + |m: &DoubleSummaryDataPoint_ValueAtQuantile| { &m.quantile }, + |m: &mut DoubleSummaryDataPoint_ValueAtQuantile| { &mut m.quantile }, + )); + fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeDouble>( + "value", + |m: &DoubleSummaryDataPoint_ValueAtQuantile| { &m.value }, + |m: &mut DoubleSummaryDataPoint_ValueAtQuantile| { &mut m.value }, + )); + ::protobuf::reflect::MessageDescriptor::new_pb_name::( + "DoubleSummaryDataPoint.ValueAtQuantile", + fields, + file_descriptor_proto() + ) + }) + } + + fn default_instance() -> &'static DoubleSummaryDataPoint_ValueAtQuantile { + static instance: ::protobuf::rt::LazyV2 = ::protobuf::rt::LazyV2::INIT; + instance.get(DoubleSummaryDataPoint_ValueAtQuantile::new) + } +} + +impl ::protobuf::Clear for DoubleSummaryDataPoint_ValueAtQuantile { + fn clear(&mut self) { + self.quantile = 0.; + self.value = 0.; + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for DoubleSummaryDataPoint_ValueAtQuantile { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for DoubleSummaryDataPoint_ValueAtQuantile { + fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef { + ::protobuf::reflect::ReflectValueRef::Message(self) + } +} + #[derive(PartialEq,Clone,Default)] #[cfg_attr(feature = "with-serde", derive(Serialize, Deserialize))] pub struct IntExemplar { @@ -4629,7 +5422,7 @@ static file_descriptor_proto_data: &'static [u8] = b"\ tationLibraryMetrics\x12n\n\x17instrumentation_library\x18\x01\x20\x01(\ \x0b25.opentelemetry.proto.common.v1.InstrumentationLibraryR\x16instrume\ ntationLibrary\x12@\n\x07metrics\x18\x02\x20\x03(\x0b2&.opentelemetry.pr\ - oto.metrics.v1.MetricR\x07metrics\"\xb7\x04\n\x06Metric\x12\x12\n\x04nam\ + oto.metrics.v1.MetricR\x07metrics\"\x8f\x05\n\x06Metric\x12\x12\n\x04nam\ e\x18\x01\x20\x01(\tR\x04name\x12\x20\n\x0bdescription\x18\x02\x20\x01(\ \tR\x0bdescription\x12\x12\n\x04unit\x18\x03\x20\x01(\tR\x04unit\x12G\n\ \tint_gauge\x18\x04\x20\x01(\x0b2(.opentelemetry.proto.metrics.v1.IntGau\ @@ -4640,70 +5433,83 @@ static file_descriptor_proto_data: &'static [u8] = b"\ s.v1.DoubleSumH\0R\tdoubleSum\x12S\n\rint_histogram\x18\x08\x20\x01(\x0b\ 2,.opentelemetry.proto.metrics.v1.IntHistogramH\0R\x0cintHistogram\x12\\\ \n\x10double_histogram\x18\t\x20\x01(\x0b2/.opentelemetry.proto.metrics.\ - v1.DoubleHistogramH\0R\x0fdoubleHistogramB\x06\n\x04data\"Y\n\x08IntGaug\ - e\x12M\n\x0bdata_points\x18\x01\x20\x03(\x0b2,.opentelemetry.proto.metri\ - cs.v1.IntDataPointR\ndataPoints\"_\n\x0bDoubleGauge\x12P\n\x0bdata_point\ - s\x18\x01\x20\x03(\x0b2/.opentelemetry.proto.metrics.v1.DoubleDataPointR\ - \ndataPoints\"\xeb\x01\n\x06IntSum\x12M\n\x0bdata_points\x18\x01\x20\x03\ - (\x0b2,.opentelemetry.proto.metrics.v1.IntDataPointR\ndataPoints\x12o\n\ - \x17aggregation_temporality\x18\x02\x20\x01(\x0e26.opentelemetry.proto.m\ - etrics.v1.AggregationTemporalityR\x16aggregationTemporality\x12!\n\x0cis\ - _monotonic\x18\x03\x20\x01(\x08R\x0bisMonotonic\"\xf1\x01\n\tDoubleSum\ - \x12P\n\x0bdata_points\x18\x01\x20\x03(\x0b2/.opentelemetry.proto.metric\ - s.v1.DoubleDataPointR\ndataPoints\x12o\n\x17aggregation_temporality\x18\ - \x02\x20\x01(\x0e26.opentelemetry.proto.metrics.v1.AggregationTemporalit\ - yR\x16aggregationTemporality\x12!\n\x0cis_monotonic\x18\x03\x20\x01(\x08\ - R\x0bisMonotonic\"\xd7\x01\n\x0cIntHistogram\x12V\n\x0bdata_points\x18\ - \x01\x20\x03(\x0b25.opentelemetry.proto.metrics.v1.IntHistogramDataPoint\ - R\ndataPoints\x12o\n\x17aggregation_temporality\x18\x02\x20\x01(\x0e26.o\ - pentelemetry.proto.metrics.v1.AggregationTemporalityR\x16aggregationTemp\ - orality\"\xdd\x01\n\x0fDoubleHistogram\x12Y\n\x0bdata_points\x18\x01\x20\ - \x03(\x0b28.opentelemetry.proto.metrics.v1.DoubleHistogramDataPointR\nda\ - taPoints\x12o\n\x17aggregation_temporality\x18\x02\x20\x01(\x0e26.opente\ - lemetry.proto.metrics.v1.AggregationTemporalityR\x16aggregationTemporali\ - ty\"\x8d\x02\n\x0cIntDataPoint\x12E\n\x06labels\x18\x01\x20\x03(\x0b2-.o\ - pentelemetry.proto.common.v1.StringKeyValueR\x06labels\x12/\n\x14start_t\ - ime_unix_nano\x18\x02\x20\x01(\x06R\x11startTimeUnixNano\x12$\n\x0etime_\ - unix_nano\x18\x03\x20\x01(\x06R\x0ctimeUnixNano\x12\x14\n\x05value\x18\ - \x04\x20\x01(\x10R\x05value\x12I\n\texemplars\x18\x05\x20\x03(\x0b2+.ope\ - ntelemetry.proto.metrics.v1.IntExemplarR\texemplars\"\x93\x02\n\x0fDoubl\ - eDataPoint\x12E\n\x06labels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.c\ - ommon.v1.StringKeyValueR\x06labels\x12/\n\x14start_time_unix_nano\x18\ - \x02\x20\x01(\x06R\x11startTimeUnixNano\x12$\n\x0etime_unix_nano\x18\x03\ - \x20\x01(\x06R\x0ctimeUnixNano\x12\x14\n\x05value\x18\x04\x20\x01(\x01R\ - \x05value\x12L\n\texemplars\x18\x05\x20\x03(\x0b2..opentelemetry.proto.m\ - etrics.v1.DoubleExemplarR\texemplars\"\xf6\x02\n\x15IntHistogramDataPoin\ - t\x12E\n\x06labels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.common.v1.\ - StringKeyValueR\x06labels\x12/\n\x14start_time_unix_nano\x18\x02\x20\x01\ - (\x06R\x11startTimeUnixNano\x12$\n\x0etime_unix_nano\x18\x03\x20\x01(\ - \x06R\x0ctimeUnixNano\x12\x14\n\x05count\x18\x04\x20\x01(\x06R\x05count\ - \x12\x10\n\x03sum\x18\x05\x20\x01(\x10R\x03sum\x12#\n\rbucket_counts\x18\ - \x06\x20\x03(\x06R\x0cbucketCounts\x12'\n\x0fexplicit_bounds\x18\x07\x20\ - \x03(\x01R\x0eexplicitBounds\x12I\n\texemplars\x18\x08\x20\x03(\x0b2+.op\ - entelemetry.proto.metrics.v1.IntExemplarR\texemplars\"\xfc\x02\n\x18Doub\ - leHistogramDataPoint\x12E\n\x06labels\x18\x01\x20\x03(\x0b2-.opentelemet\ - ry.proto.common.v1.StringKeyValueR\x06labels\x12/\n\x14start_time_unix_n\ - ano\x18\x02\x20\x01(\x06R\x11startTimeUnixNano\x12$\n\x0etime_unix_nano\ - \x18\x03\x20\x01(\x06R\x0ctimeUnixNano\x12\x14\n\x05count\x18\x04\x20\ - \x01(\x06R\x05count\x12\x10\n\x03sum\x18\x05\x20\x01(\x01R\x03sum\x12#\n\ - \rbucket_counts\x18\x06\x20\x03(\x06R\x0cbucketCounts\x12'\n\x0fexplicit\ - _bounds\x18\x07\x20\x03(\x01R\x0eexplicitBounds\x12L\n\texemplars\x18\ - \x08\x20\x03(\x0b2..opentelemetry.proto.metrics.v1.DoubleExemplarR\texem\ - plars\"\xd5\x01\n\x0bIntExemplar\x12V\n\x0ffiltered_labels\x18\x01\x20\ - \x03(\x0b2-.opentelemetry.proto.common.v1.StringKeyValueR\x0efilteredLab\ - els\x12$\n\x0etime_unix_nano\x18\x02\x20\x01(\x06R\x0ctimeUnixNano\x12\ - \x14\n\x05value\x18\x03\x20\x01(\x10R\x05value\x12\x17\n\x07span_id\x18\ - \x04\x20\x01(\x0cR\x06spanId\x12\x19\n\x08trace_id\x18\x05\x20\x01(\x0cR\ - \x07traceId\"\xd8\x01\n\x0eDoubleExemplar\x12V\n\x0ffiltered_labels\x18\ - \x01\x20\x03(\x0b2-.opentelemetry.proto.common.v1.StringKeyValueR\x0efil\ - teredLabels\x12$\n\x0etime_unix_nano\x18\x02\x20\x01(\x06R\x0ctimeUnixNa\ - no\x12\x14\n\x05value\x18\x03\x20\x01(\x01R\x05value\x12\x17\n\x07span_i\ - d\x18\x04\x20\x01(\x0cR\x06spanId\x12\x19\n\x08trace_id\x18\x05\x20\x01(\ - \x0cR\x07traceId*\x8c\x01\n\x16AggregationTemporality\x12'\n#AGGREGATION\ - _TEMPORALITY_UNSPECIFIED\x10\0\x12!\n\x1dAGGREGATION_TEMPORALITY_DELTA\ - \x10\x01\x12&\n\"AGGREGATION_TEMPORALITY_CUMULATIVE\x10\x02Bt\n!io.opent\ - elemetry.proto.metrics.v1B\x0cMetricsProtoP\x01Z?github.com/open-telemet\ - ry/opentelemetry-proto/gen/go/metrics/v1b\x06proto3\ + v1.DoubleHistogramH\0R\x0fdoubleHistogram\x12V\n\x0edouble_summary\x18\ + \x0b\x20\x01(\x0b2-.opentelemetry.proto.metrics.v1.DoubleSummaryH\0R\rdo\ + ubleSummaryB\x06\n\x04data\"Y\n\x08IntGauge\x12M\n\x0bdata_points\x18\ + \x01\x20\x03(\x0b2,.opentelemetry.proto.metrics.v1.IntDataPointR\ndataPo\ + ints\"_\n\x0bDoubleGauge\x12P\n\x0bdata_points\x18\x01\x20\x03(\x0b2/.op\ + entelemetry.proto.metrics.v1.DoubleDataPointR\ndataPoints\"\xeb\x01\n\ + \x06IntSum\x12M\n\x0bdata_points\x18\x01\x20\x03(\x0b2,.opentelemetry.pr\ + oto.metrics.v1.IntDataPointR\ndataPoints\x12o\n\x17aggregation_temporali\ + ty\x18\x02\x20\x01(\x0e26.opentelemetry.proto.metrics.v1.AggregationTemp\ + oralityR\x16aggregationTemporality\x12!\n\x0cis_monotonic\x18\x03\x20\ + \x01(\x08R\x0bisMonotonic\"\xf1\x01\n\tDoubleSum\x12P\n\x0bdata_points\ + \x18\x01\x20\x03(\x0b2/.opentelemetry.proto.metrics.v1.DoubleDataPointR\ + \ndataPoints\x12o\n\x17aggregation_temporality\x18\x02\x20\x01(\x0e26.op\ + entelemetry.proto.metrics.v1.AggregationTemporalityR\x16aggregationTempo\ + rality\x12!\n\x0cis_monotonic\x18\x03\x20\x01(\x08R\x0bisMonotonic\"\xd7\ + \x01\n\x0cIntHistogram\x12V\n\x0bdata_points\x18\x01\x20\x03(\x0b25.open\ + telemetry.proto.metrics.v1.IntHistogramDataPointR\ndataPoints\x12o\n\x17\ + aggregation_temporality\x18\x02\x20\x01(\x0e26.opentelemetry.proto.metri\ + cs.v1.AggregationTemporalityR\x16aggregationTemporality\"\xdd\x01\n\x0fD\ + oubleHistogram\x12Y\n\x0bdata_points\x18\x01\x20\x03(\x0b28.opentelemetr\ + y.proto.metrics.v1.DoubleHistogramDataPointR\ndataPoints\x12o\n\x17aggre\ + gation_temporality\x18\x02\x20\x01(\x0e26.opentelemetry.proto.metrics.v1\ + .AggregationTemporalityR\x16aggregationTemporality\"h\n\rDoubleSummary\ + \x12W\n\x0bdata_points\x18\x01\x20\x03(\x0b26.opentelemetry.proto.metric\ + s.v1.DoubleSummaryDataPointR\ndataPoints\"\x8d\x02\n\x0cIntDataPoint\x12\ + E\n\x06labels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.common.v1.Strin\ + gKeyValueR\x06labels\x12/\n\x14start_time_unix_nano\x18\x02\x20\x01(\x06\ + R\x11startTimeUnixNano\x12$\n\x0etime_unix_nano\x18\x03\x20\x01(\x06R\ + \x0ctimeUnixNano\x12\x14\n\x05value\x18\x04\x20\x01(\x10R\x05value\x12I\ + \n\texemplars\x18\x05\x20\x03(\x0b2+.opentelemetry.proto.metrics.v1.IntE\ + xemplarR\texemplars\"\x93\x02\n\x0fDoubleDataPoint\x12E\n\x06labels\x18\ + \x01\x20\x03(\x0b2-.opentelemetry.proto.common.v1.StringKeyValueR\x06lab\ + els\x12/\n\x14start_time_unix_nano\x18\x02\x20\x01(\x06R\x11startTimeUni\ + xNano\x12$\n\x0etime_unix_nano\x18\x03\x20\x01(\x06R\x0ctimeUnixNano\x12\ + \x14\n\x05value\x18\x04\x20\x01(\x01R\x05value\x12L\n\texemplars\x18\x05\ + \x20\x03(\x0b2..opentelemetry.proto.metrics.v1.DoubleExemplarR\texemplar\ + s\"\xf6\x02\n\x15IntHistogramDataPoint\x12E\n\x06labels\x18\x01\x20\x03(\ + \x0b2-.opentelemetry.proto.common.v1.StringKeyValueR\x06labels\x12/\n\ + \x14start_time_unix_nano\x18\x02\x20\x01(\x06R\x11startTimeUnixNano\x12$\ + \n\x0etime_unix_nano\x18\x03\x20\x01(\x06R\x0ctimeUnixNano\x12\x14\n\x05\ + count\x18\x04\x20\x01(\x06R\x05count\x12\x10\n\x03sum\x18\x05\x20\x01(\ + \x10R\x03sum\x12#\n\rbucket_counts\x18\x06\x20\x03(\x06R\x0cbucketCounts\ + \x12'\n\x0fexplicit_bounds\x18\x07\x20\x03(\x01R\x0eexplicitBounds\x12I\ + \n\texemplars\x18\x08\x20\x03(\x0b2+.opentelemetry.proto.metrics.v1.IntE\ + xemplarR\texemplars\"\xfc\x02\n\x18DoubleHistogramDataPoint\x12E\n\x06la\ + bels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.common.v1.StringKeyValue\ + R\x06labels\x12/\n\x14start_time_unix_nano\x18\x02\x20\x01(\x06R\x11star\ + tTimeUnixNano\x12$\n\x0etime_unix_nano\x18\x03\x20\x01(\x06R\x0ctimeUnix\ + Nano\x12\x14\n\x05count\x18\x04\x20\x01(\x06R\x05count\x12\x10\n\x03sum\ + \x18\x05\x20\x01(\x01R\x03sum\x12#\n\rbucket_counts\x18\x06\x20\x03(\x06\ + R\x0cbucketCounts\x12'\n\x0fexplicit_bounds\x18\x07\x20\x03(\x01R\x0eexp\ + licitBounds\x12L\n\texemplars\x18\x08\x20\x03(\x0b2..opentelemetry.proto\ + .metrics.v1.DoubleExemplarR\texemplars\"\x94\x03\n\x16DoubleSummaryDataP\ + oint\x12E\n\x06labels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.common.\ + v1.StringKeyValueR\x06labels\x12/\n\x14start_time_unix_nano\x18\x02\x20\ + \x01(\x06R\x11startTimeUnixNano\x12$\n\x0etime_unix_nano\x18\x03\x20\x01\ + (\x06R\x0ctimeUnixNano\x12\x14\n\x05count\x18\x04\x20\x01(\x06R\x05count\ + \x12\x10\n\x03sum\x18\x05\x20\x01(\x01R\x03sum\x12o\n\x0fquantile_values\ + \x18\x06\x20\x03(\x0b2F.opentelemetry.proto.metrics.v1.DoubleSummaryData\ + Point.ValueAtQuantileR\x0equantileValues\x1aC\n\x0fValueAtQuantile\x12\ + \x1a\n\x08quantile\x18\x01\x20\x01(\x01R\x08quantile\x12\x14\n\x05value\ + \x18\x02\x20\x01(\x01R\x05value\"\xd5\x01\n\x0bIntExemplar\x12V\n\x0ffil\ + tered_labels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.common.v1.String\ + KeyValueR\x0efilteredLabels\x12$\n\x0etime_unix_nano\x18\x02\x20\x01(\ + \x06R\x0ctimeUnixNano\x12\x14\n\x05value\x18\x03\x20\x01(\x10R\x05value\ + \x12\x17\n\x07span_id\x18\x04\x20\x01(\x0cR\x06spanId\x12\x19\n\x08trace\ + _id\x18\x05\x20\x01(\x0cR\x07traceId\"\xd8\x01\n\x0eDoubleExemplar\x12V\ + \n\x0ffiltered_labels\x18\x01\x20\x03(\x0b2-.opentelemetry.proto.common.\ + v1.StringKeyValueR\x0efilteredLabels\x12$\n\x0etime_unix_nano\x18\x02\ + \x20\x01(\x06R\x0ctimeUnixNano\x12\x14\n\x05value\x18\x03\x20\x01(\x01R\ + \x05value\x12\x17\n\x07span_id\x18\x04\x20\x01(\x0cR\x06spanId\x12\x19\n\ + \x08trace_id\x18\x05\x20\x01(\x0cR\x07traceId*\x8c\x01\n\x16AggregationT\ + emporality\x12'\n#AGGREGATION_TEMPORALITY_UNSPECIFIED\x10\0\x12!\n\x1dAG\ + GREGATION_TEMPORALITY_DELTA\x10\x01\x12&\n\"AGGREGATION_TEMPORALITY_CUMU\ + LATIVE\x10\x02Bt\n!io.opentelemetry.proto.metrics.v1B\x0cMetricsProtoP\ + \x01Z?github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1b\ + \x06proto3\ "; static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT; diff --git a/opentelemetry/Cargo.toml b/opentelemetry/Cargo.toml index d09b814495..8aa495134e 100644 --- a/opentelemetry/Cargo.toml +++ b/opentelemetry/Cargo.toml @@ -26,7 +26,7 @@ async-trait = { version = "0.1", optional = true } bincode = { version = "1.2", optional = true } dashmap = { version = "4.0.0-rc6", optional = true } fnv = { version = "1.0", optional = true } -futures = { version = "0.3", features = ["std"] } +futures = "0.3" lazy_static = "1.4" percent-encoding = { version = "2.0", optional = true } pin-project = { version = "0.4", optional = true } diff --git a/opentelemetry/src/api/trace/mod.rs b/opentelemetry/src/api/trace/mod.rs index 776fed0151..075cd88697 100644 --- a/opentelemetry/src/api/trace/mod.rs +++ b/opentelemetry/src/api/trace/mod.rs @@ -110,6 +110,8 @@ //! Please review the W3C specification for details on the [Tracestate //! field](https://www.w3.org/TR/trace-context/#tracestate-field). //! +use thiserror::Error; + mod context; mod event; mod futures; @@ -137,8 +139,6 @@ pub use self::{ tracer::{SpanBuilder, Tracer}, }; -use thiserror::Error; - /// Errors returned by the trace API. #[derive(Error, Debug, PartialEq)] #[non_exhaustive] diff --git a/opentelemetry/src/sdk/trace/span_processor.rs b/opentelemetry/src/sdk/trace/span_processor.rs index 0810bfd1aa..7ce7c2b5f3 100644 --- a/opentelemetry/src/sdk/trace/span_processor.rs +++ b/opentelemetry/src/sdk/trace/span_processor.rs @@ -44,7 +44,7 @@ use futures::{ channel::mpsc, channel::oneshot, executor, future::BoxFuture, future::Either, pin_mut, Future, FutureExt, Stream, StreamExt, }; -use std::{fmt, pin::Pin, str::FromStr, sync::Mutex, time}; +use std::{fmt, str::FromStr, sync::Mutex, time}; /// Delay interval between two consecutive exports, default to be 5000. const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS"; @@ -140,7 +140,11 @@ impl SpanProcessor for SimpleSpanProcessor { exporter.shutdown(); Ok(()) } else { - Err(TraceError::Other("When force flushing the SimpleSpanProcessor, the exporter's lock has been poisoned".into()).into()) + Err(TraceError::Other( + "When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned" + .into(), + ) + .into()) } } } @@ -189,7 +193,6 @@ impl SpanProcessor for SimpleSpanProcessor { /// [`async-std`]: https://async.rs pub struct BatchSpanProcessor { message_sender: Mutex>, - _worker_handle: Option + Send + Sync>>>, } impl fmt::Debug for BatchSpanProcessor { @@ -224,7 +227,7 @@ impl SpanProcessor for BatchSpanProcessor { } fn shutdown(&mut self) -> Result<(), Box> { - let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned".into()))?; + let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned".into()))?; let (res_sender, res_receiver) = oneshot::channel::>(); sender.try_send(BatchMessage::Shutdown(res_sender))?; for result in futures::executor::block_on(res_receiver)? { @@ -263,7 +266,7 @@ impl BatchSpanProcessor { let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush(None)); // Spawn worker process via user-defined spawn function. - let worker_handle = spawn(Box::pin(async move { + let _worker_handle = spawn(Box::pin(async move { let mut spans = Vec::new(); let mut messages = Box::pin(futures::stream::select(message_receiver, ticker)); @@ -344,7 +347,6 @@ impl BatchSpanProcessor { // Return batch processor with link to worker BatchSpanProcessor { message_sender: Mutex::new(message_sender), - _worker_handle: Some(Box::pin(worker_handle)), } }