Skip to content

Commit

Permalink
Feat: add with_batch_processor_config for jaeger pipline (#869)
Browse files Browse the repository at this point in the history
* feat: add function to set BatchConfig

* feat: add set_batch_config to HasRequiredConfig trait

* feat: add with_batch_processor_config for CollectorPipeline

Signed-off-by: zengxilong <zengxilonglh@gmail.com>

* feat: add with_batch_processor_config for AgentPipeline

Signed-off-by: zengxilong <zengxilonglh@gmail.com>

* add test

Signed-off-by: zengxilong <zengxilonglh@gmail.com>

* feat: add comments

Signed-off-by: zengxilong <zengxilonglh@gmail.com>

* fix comments

Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>

Signed-off-by: zengxilong <zengxilonglh@gmail.com>
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
  • Loading branch information
zengxilong and TommyCpp committed Sep 2, 2022
1 parent 9ea1ee7 commit 0e6e67e
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 4 deletions.
35 changes: 33 additions & 2 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Expand Up @@ -6,7 +6,7 @@ use crate::exporter::config::{
use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader};
use crate::{Error, Exporter, JaegerTraceRuntime};
use opentelemetry::sdk;
use opentelemetry::sdk::trace::{Config, TracerProvider};
use opentelemetry::sdk::trace::{BatchConfig, Config, TracerProvider};
use opentelemetry::trace::TraceError;
use std::borrow::BorrowMut;
use std::sync::Arc;
Expand Down Expand Up @@ -72,6 +72,7 @@ const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831";
pub struct AgentPipeline {
transformation_config: TransformationConfig,
trace_config: Option<sdk::trace::Config>,
batch_config: Option<sdk::trace::BatchConfig>,
agent_endpoint: Result<Vec<net::SocketAddr>, crate::Error>,
max_packet_size: usize,
auto_split_batch: bool,
Expand All @@ -82,6 +83,7 @@ impl Default for AgentPipeline {
let mut pipeline = AgentPipeline {
transformation_config: Default::default(),
trace_config: Default::default(),
batch_config: Some(Default::default()),
agent_endpoint: Ok(vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()]),
max_packet_size: UDP_PACKET_MAX_LENGTH,
auto_split_batch: false,
Expand All @@ -108,6 +110,10 @@ impl HasRequiredConfig for AgentPipeline {
fn set_trace_config(&mut self, config: Config) {
self.trace_config = Some(config)
}

fn set_batch_config(&mut self, config: BatchConfig) {
self.batch_config = Some(config)
}
}

/// Start a new pipeline to configure a exporter that target a jaeger agent.
Expand Down Expand Up @@ -226,6 +232,27 @@ impl AgentPipeline {
self
}

/// Assign the batch span processor for the exporter pipeline.
///
/// If a simple span processor is used by [`install_simple`][AgentPipeline::install_simple]
/// or [`build_simple`][AgentPipeline::install_simple], then this config will not be ignored.
///
/// # Examples
/// Set max queue size.
/// ```rust
/// use opentelemetry::sdk::trace::BatchConfig;
///
/// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
/// .with_batch_processor_config(
/// BatchConfig::default().with_max_queue_size(200)
/// );
///
/// ```
pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self {
self.set_batch_config(config);
self
}

/// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
///
/// The exporter will send each span to the agent upon the span ends.
Expand Down Expand Up @@ -273,10 +300,14 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let batch_config = self.batch_config.take();
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime)
.with_batch_config(batch_config.unwrap_or_default())
.build();

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_span_processor(batch_processor);
builder = builder.with_config(config);

Ok(builder.build())
Expand Down
31 changes: 30 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Expand Up @@ -5,6 +5,7 @@ use crate::exporter::config::{
use crate::exporter::uploader::{AsyncUploader, Uploader};
use crate::{Exporter, JaegerTraceRuntime};
use http::Uri;
use opentelemetry::sdk::trace::BatchConfig;
use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError};
use std::borrow::BorrowMut;
use std::convert::TryFrom;
Expand Down Expand Up @@ -91,6 +92,7 @@ const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD";
pub struct CollectorPipeline {
transformation_config: TransformationConfig,
trace_config: Option<TraceConfig>,
batch_config: Option<BatchConfig>,

#[cfg(feature = "collector_client")]
collector_timeout: Duration,
Expand All @@ -113,6 +115,7 @@ impl Default for CollectorPipeline {
client_config: ClientConfig::default(),
transformation_config: Default::default(),
trace_config: Default::default(),
batch_config: Some(Default::default()),
};

#[cfg(feature = "collector_client")]
Expand Down Expand Up @@ -155,6 +158,10 @@ impl HasRequiredConfig for CollectorPipeline {
fn set_trace_config(&mut self, config: TraceConfig) {
self.trace_config = Some(config)
}

fn set_batch_config(&mut self, config: BatchConfig) {
self.batch_config = Some(config)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -398,6 +405,24 @@ impl CollectorPipeline {
self
}

/// Assign the batch span processor for the exporter pipeline.
///
/// # Examples
/// Set max queue size.
/// ```rust
/// use opentelemetry::sdk::trace::BatchConfig;
///
/// let pipeline = opentelemetry_jaeger::new_collector_pipeline()
/// .with_batch_processor_config(
/// BatchConfig::default().with_max_queue_size(200)
/// );
///
/// ```
pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self {
self.set_batch_config(config);
self
}

/// Build a `TracerProvider` using a async exporter and configurations from the pipeline.
///
/// The exporter will collect spans in a batch and send them to the agent.
Expand All @@ -423,10 +448,14 @@ impl CollectorPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let batch_config = self.batch_config.take();
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime)
.with_batch_config(batch_config.unwrap_or_default())
.build();

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_span_processor(batch_processor);
builder = builder.with_config(config);

Ok(builder.build())
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -36,13 +36,15 @@ impl Default for TransformationConfig {
}
}

// pipeline must have transformation config and trace config.
// pipeline must have transformation config, trace config and batch config.
trait HasRequiredConfig {
fn set_transformation_config<T>(&mut self, f: T)
where
T: FnOnce(&mut TransformationConfig);

fn set_trace_config(&mut self, config: sdk::trace::Config);

fn set_batch_config(&mut self, config: sdk::trace::BatchConfig);
}

// To reduce the overhead of copying service name in every spans. We convert resource into jaeger tags
Expand Down
68 changes: 68 additions & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Expand Up @@ -554,6 +554,54 @@ impl Default for BatchConfig {
}
}

impl BatchConfig {
/// Set max_queue_size for [`BatchConfig`].
/// It's the maximum queue size to buffer spans for delayed processing.
/// If the queue gets full it will drops the spans.
/// The default value of is 2048.
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}

/// Set max_export_batch_size for [`BatchConfig`].
/// It's the maximum number of spans to process in a single batch. If there are
/// more than one batch worth of spans then it processes multiple batches
/// of spans one batch after the other without any delay. The default value
/// is 512.
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}

/// Set max_concurrent_exports for [`BatchConfig`].
/// It's the maximum number of concurrent exports.
/// Limits the number of spawned tasks for exports and thus memory consumed by an exporter.
/// The default value is 1.
/// IF the max_concurrent_exports value is default value, it will cause exports to be performed
/// synchronously on the BatchSpanProcessor task.
pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
self.max_concurrent_exports = max_concurrent_exports;
self
}

/// Set scheduled_delay_duration for [`BatchConfig`].
/// It's the delay interval in milliseconds between two consecutive processing of batches.
/// The default value is 5000 milliseconds.
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}

/// Set max_export_timeout for [`BatchConfig`].
/// It's the maximum duration to export a batch of data.
/// The The default value is 30000 milliseconds.
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}
}

/// A builder for creating [`BatchSpanProcessor`] instances.
///
#[derive(Debug)]
Expand Down Expand Up @@ -616,6 +664,11 @@ where
BatchSpanProcessorBuilder { config, ..self }
}

/// Set the BatchConfig for [BatchSpanProcessorBuilder]
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchSpanProcessorBuilder { config, ..self }
}

/// Build a batch processor
pub fn build(self) -> BatchSpanProcessor<R> {
BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
Expand Down Expand Up @@ -657,6 +710,21 @@ mod tests {
assert!(rx_shutdown.try_recv().is_ok());
}

#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfig::default()
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_millis(10))
.with_max_export_timeout(Duration::from_millis(10))
.with_max_concurrent_exports(10)
.with_max_queue_size(10);
assert_eq!(batch.max_export_batch_size, 10);
assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
assert_eq!(batch.max_concurrent_exports, 10);
assert_eq!(batch.max_queue_size, 10);
}

#[test]
fn test_build_batch_span_processor_builder() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
Expand Down

0 comments on commit 0e6e67e

Please sign in to comment.