Skip to content

Commit

Permalink
Implement timeout for force_flush and shutdown.
Browse files Browse the repository at this point in the history
Note that the spec only has configuration for maximum time to export data. So the timeout is based on the export time.
  • Loading branch information
TommyCpp committed Nov 14, 2020
1 parent 310df35 commit 463823c
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 28 deletions.
1 change: 1 addition & 0 deletions opentelemetry/Cargo.toml
Expand Up @@ -44,6 +44,7 @@ surf = { version = "2.0", default-features = false, optional = true }
criterion = "0.3.1"
rand_distr = "0.3.0"
tokio = { version = "0.2", features = ["full"] }
async-std = { version = "1.6", features = ["unstable"]}

[features]
default = ["trace"]
Expand Down
211 changes: 183 additions & 28 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -44,6 +44,11 @@ use std::pin::Pin;
use std::str::FromStr;
use std::sync::Mutex;
use std::time;
use crate::exporter::trace::ExportResult;
use std::error::Error;
use std::fmt::Display;
use futures::pin_mut;
use futures::future::Either;

/// Delay interval between two consecutive exports, default to be 5000.
const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS";
Expand All @@ -57,6 +62,10 @@ const OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT: usize = 2048;
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
/// Default maximum batch size
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
/// Maximum allowed time to export data
// const OTEL_BSP_EXPORT_TIMEOUT_MILLIS: &str = "OTEL_BSP_EXPORT_TIMEOUT_MILLIS";
/// Default maximum allowed time to export data
const OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT: u64 = 30000;

/// `SpanProcessor` is an interface which allows hooks for span start and end
/// method invocations. The span processors are invoked only when is_recording
Expand Down Expand Up @@ -180,7 +189,7 @@ impl SpanProcessor for SimpleSpanProcessor {
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor {
message_sender: Mutex<mpsc::Sender<BatchMessage>>,
worker_handle: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
worker_handle: Option<Pin<Box<dyn Future<Output=()> + Send + Sync>>>,
}

impl fmt::Debug for BatchSpanProcessor {
Expand Down Expand Up @@ -228,18 +237,32 @@ enum BatchMessage {
Shutdown,
}

#[derive(Debug, Default)]
struct TimeoutErr {}

impl Display for TimeoutErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("timed out")
}
}

impl Error for TimeoutErr {}

impl BatchSpanProcessor {
pub(crate) fn new<S, SH, SO, I, IS, ISI>(
pub(crate) fn new<S, SH, SO, I, IS, ISI, D, DS>(
mut exporter: Box<dyn SpanExporter>,
spawn: S,
interval: I,
delay: D,
config: BatchConfig,
) -> Self
where
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IS,
IS: Stream<Item = ISI> + Send + 'static,
where
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output=SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IS,
IS: Stream<Item=ISI> + Send + 'static,
D: (Fn(time::Duration) -> DS) + Send + Sync + 'static,
DS: Future<Output=()> + 'static + Send + Sync,
{
let (message_sender, message_receiver) = mpsc::channel(config.max_queue_size);
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush);
Expand All @@ -264,8 +287,15 @@ impl BatchSpanProcessor {
spans.len().saturating_sub(config.max_export_batch_size),
);

let export = exporter.export(batch);
let timeout = delay(config.max_export_timeout);
pin_mut!(export);
pin_mut!(timeout);
// TODO: Surface error through global error handler
let _result = exporter.export(batch).await;
let _result = match futures::future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())),
};
}
}
// Stream has terminated or processor is shutdown, return to finish execution.
Expand All @@ -275,16 +305,23 @@ impl BatchSpanProcessor {
spans.len().saturating_sub(config.max_export_batch_size),
);

let export = exporter.export(batch);
let timeout = delay(config.max_export_timeout);
pin_mut!(export);
pin_mut!(timeout);
// TODO: Surface error through global error handler
let _result = exporter.export(batch).await;
let _result = match futures::future::select(export, timeout).await {
Either::Left((export_res, _)) => export_res,
Either::Right((_, _)) => ExportResult::Err(Box::new(TimeoutErr::default())),
};
}
exporter.shutdown();
break;
}
}
}
}))
.map(|_| ());
.map(|_| ());

// Return batch processor with link to worker
BatchSpanProcessor {
Expand All @@ -299,11 +336,11 @@ impl BatchSpanProcessor {
spawn: S,
interval: I,
) -> BatchSpanProcessorBuilder<E, S, I>
where
E: SpanExporter,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IO,
where
E: SpanExporter,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output=SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IO,
{
BatchSpanProcessorBuilder {
exporter,
Expand All @@ -325,11 +362,11 @@ impl BatchSpanProcessor {
spawn: S,
interval: I,
) -> BatchSpanProcessorBuilder<E, S, I>
where
E: SpanExporter,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IO,
where
E: SpanExporter,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output=SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IO,
{
let mut config = BatchConfig::default();
let schedule_delay = std::env::var(OTEL_BSP_SCHEDULE_DELAY_MILLIS)
Expand Down Expand Up @@ -382,6 +419,9 @@ pub struct BatchConfig {
/// of spans one batch after the other without any delay. The default value
/// is 512.
max_export_batch_size: usize,

/// The maximum duration to export a batch of data.
max_export_timeout: time::Duration,
}

impl Default for BatchConfig {
Expand All @@ -390,6 +430,7 @@ impl Default for BatchConfig {
max_queue_size: OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: time::Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT),
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: time::Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_MILLIS_DEFAULT),
}
}
}
Expand All @@ -406,12 +447,12 @@ pub struct BatchSpanProcessorBuilder<E, S, I> {
}

impl<E, S, SH, SO, I, IS, ISI> BatchSpanProcessorBuilder<E, S, I>
where
E: SpanExporter + 'static,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output = SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IS,
IS: Stream<Item = ISI> + Send + 'static,
where
E: SpanExporter + 'static,
S: Fn(BoxFuture<'static, ()>) -> SH,
SH: Future<Output=SO> + Send + Sync + 'static,
I: Fn(time::Duration) -> IS,
IS: Stream<Item=ISI> + Send + 'static,
{
/// Set max queue size for batches
pub fn with_max_queue_size(self, size: usize) -> Self {
Expand Down Expand Up @@ -449,25 +490,32 @@ where
Box::new(self.exporter),
self.spawn,
self.interval,
tokio::time::delay_for,
self.config,
)
}
}

#[cfg(test)]
#[allow(warnings)]
mod tests {
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS,
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
use crate::exporter::trace::stdout;
use crate::exporter::trace::{stdout, SpanExporter, SpanData, ExportResult};
use futures::pin_mut;
use crate::sdk::trace::BatchConfig;
use crate::testing::trace::{
new_test_export_span_data, new_test_exporter, new_tokio_test_exporter,
};
use std::time;
use tokio::time::Duration;
use async_trait::async_trait;
use std::time::{Instant, Duration};
use async_std::prelude::*;
use std::ops::Sub;
use std::fmt::Debug;

#[test]
fn simple_span_processor_on_end_calls_export() {
Expand Down Expand Up @@ -526,6 +574,7 @@ mod tests {
Box::new(exporter),
tokio::spawn,
tokio::time::interval,
tokio::time::delay_for,
config,
);
let handle = tokio::spawn(async move {
Expand All @@ -547,4 +596,110 @@ mod tests {
"timed out in 5 seconds. force_flush may not export any data when called"
);
}

struct BlockingExporter<D> {
delay_for: time::Duration,
delay_fn: D,
}

impl<D, DS> Debug for BlockingExporter<D>
where D: Fn(time::Duration) -> DS + 'static + Send + Sync,
DS: Future<Output=()> + Send + Sync + 'static {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("blocking exporter for testing")
}
}

#[async_trait]
impl<D, DS> SpanExporter for BlockingExporter<D>
where D: Fn(time::Duration) -> DS + 'static + Send + Sync,
DS: Future<Output=()> + Send + Sync + 'static {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
println!("Accepting {} spans", batch.len());
(self.delay_fn)(self.delay_for).await;
println!("Finish exporting, return result from exporter");
Ok(())
}
}

#[test]
fn test_timeout_std_async() {
async_std::task::block_on(async {
let mut config = BatchConfig::default();
config.max_export_timeout = time::Duration::from_secs(5);
config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush
let mut exporter = BlockingExporter {
delay_for: time::Duration::from_secs(30),
delay_fn: async_std::task::sleep,
};
let mut processor = BatchSpanProcessor::new(
Box::new(exporter),
async_std::task::spawn,
async_std::stream::interval,
async_std::task::sleep,
config,
);
async_std::task::sleep(time::Duration::from_secs(1)).await; // skip the first
let start_time = Instant::now();
processor.on_end(new_test_export_span_data());
processor.force_flush();
processor.shutdown();
let end_time = Instant::now();
assert!(end_time.sub(start_time).as_secs() < 7);
})
}

#[test]
fn test_timeout_tokio() {
let mut runtime = tokio::runtime::Builder::new().threaded_scheduler().enable_all().build().unwrap();
/// Assert time out will return if exporter takes too long
runtime.block_on(async {
let mut config = BatchConfig::default();
config.max_export_timeout = time::Duration::from_secs(5);
config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush
let mut exporter = BlockingExporter {
delay_for: time::Duration::from_secs(30),
delay_fn: tokio::time::delay_for,
};
let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut));
let mut processor = BatchSpanProcessor::new(
Box::new(exporter),
spawn,
tokio::time::interval,
tokio::time::delay_for,
config,
);
let start_time = Instant::now();
processor.on_end(new_test_export_span_data());
processor.force_flush();
processor.shutdown();
let end_time = Instant::now();
assert!(end_time.sub(start_time).as_secs() < 7);
})

/// Assert exporters could return if the time spend is less than timeout.
runtime.block_on(async {
let mut config = BatchConfig::default();
config.max_export_timeout = time::Duration::from_secs(5);
config.scheduled_delay = Duration::from_secs(60 * 60 * 24); // set the tick to 24 hours so we know the span must be exported via force_flush
let mut exporter = BlockingExporter {
delay_for: time::Duration::from_secs(30),
delay_fn: tokio::time::delay_for,
};
let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut));
let mut processor = BatchSpanProcessor::new(
Box::new(exporter),
spawn,
tokio::time::interval,
tokio::time::delay_for,
config,
);
let start_time = Instant::now();
processor.on_end(new_test_export_span_data());
processor.force_flush();
processor.shutdown();
let end_time = Instant::now();
assert!(end_time.sub(start_time).as_secs() < 7);
})
}
}

0 comments on commit 463823c

Please sign in to comment.