Skip to content

Commit

Permalink
Add configuration for BSP max_concurrent_exports
Browse files Browse the repository at this point in the history
Users may desire to control the level of export concurrency in the batch
span processor. There are two special values:

    max_concurrent_exports = 0: no bound on concurrency
    max_concurrent_exports = 1: no concurrency, makes everything
    synchronous on the messaging task.
  • Loading branch information
jwilm committed Apr 21, 2022
1 parent c93afc9 commit cc2175d
Showing 1 changed file with 27 additions and 14 deletions.
41 changes: 27 additions & 14 deletions opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
const OTEL_BSP_EXPORT_TIMEOUT: &str = "OTEL_BSP_EXPORT_TIMEOUT";
/// Default maximum allowed time to export data.
const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: u64 = 30_000;
/// Default max concurrent exports for BSP
// TODO jwilm: I omitted the OTEL_ prefix here as this is a non-standard config
// value. Should it be kept this way or prefix with OTEL_ for consistency?
const BSP_MAX_CONCURRENT_EXPORTS: usize = 0;

/// `SpanProcessor` is an interface which allows hooks for span start and end
/// method invocations. The span processors are invoked only when is_recording
Expand Down Expand Up @@ -330,26 +334,25 @@ impl<R: TraceRuntime> BatchSpanProcessorInternal<R> {

if self.spans.len() == self.config.max_export_batch_size {
// If concurrent exports are saturated, wait for one to complete.
// TODO jwilm: ignore max_concurrent_exports == 0
if self.export_tasks.len() == self.config.max_concurrent_exports {
if self.export_tasks.len() > 0
&& self.export_tasks.len() == self.config.max_concurrent_exports
{
self.export_tasks.next().await;
}

let task = self.export();
let export_task = self.export();
let task = async move {
if let Err(err) = export_task.await {
global::handle_error(err);
}

Ok(())
};
// Special case when not using concurrent exports
if self.config.max_concurrent_exports == 1 {
if let Err(err) = task.await {
global::handle_error(err);
}
let _ = task.await;
} else {
self.export_tasks.push(Box::pin(async move {
if let Err(err) = task.await {
global::handle_error(err);
}

Ok(())
}));
self.export_tasks.push(Box::pin(task));
}
}
}
Expand Down Expand Up @@ -502,7 +505,7 @@ impl Default for BatchConfig {
scheduled_delay: Duration::from_millis(OTEL_BSP_SCHEDULE_DELAY_DEFAULT),
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: Duration::from_millis(OTEL_BSP_EXPORT_TIMEOUT_DEFAULT),
max_concurrent_exports: 16, // TODO jwilm
max_concurrent_exports: BSP_MAX_CONCURRENT_EXPORTS,
};

if let Some(max_queue_size) = env::var(OTEL_BSP_MAX_QUEUE_SIZE)
Expand Down Expand Up @@ -597,6 +600,16 @@ where
BatchSpanProcessorBuilder { config, ..self }
}

/// Set the maximum number of concurrent exports
///
/// This setting may be useful for limiting network throughput or memory
/// consumption.
pub fn with_max_concurrent_exports(self, max: usize) -> Self {
let mut config = self.config;
config.max_concurrent_exports = max;
BatchSpanProcessorBuilder { config, ..self }
}

/// Build a batch processor
pub fn build(self) -> BatchSpanProcessor<R> {
BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
Expand Down

0 comments on commit cc2175d

Please sign in to comment.