Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add with_batch_processor_config for jaeger pipline #869

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 33 additions & 2 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Expand Up @@ -6,7 +6,7 @@ use crate::exporter::config::{
use crate::exporter::uploader::{AsyncUploader, SyncUploader, Uploader};
use crate::{Error, Exporter, JaegerTraceRuntime};
use opentelemetry::sdk;
use opentelemetry::sdk::trace::{Config, TracerProvider};
use opentelemetry::sdk::trace::{BatchConfig, Config, TracerProvider};
use opentelemetry::trace::TraceError;
use std::borrow::BorrowMut;
use std::sync::Arc;
Expand Down Expand Up @@ -72,6 +72,7 @@ const DEFAULT_AGENT_ENDPOINT: &str = "127.0.0.1:6831";
pub struct AgentPipeline {
transformation_config: TransformationConfig,
trace_config: Option<sdk::trace::Config>,
batch_config: Option<sdk::trace::BatchConfig>,
agent_endpoint: Result<Vec<net::SocketAddr>, crate::Error>,
max_packet_size: usize,
auto_split_batch: bool,
Expand All @@ -82,6 +83,7 @@ impl Default for AgentPipeline {
let mut pipeline = AgentPipeline {
transformation_config: Default::default(),
trace_config: Default::default(),
batch_config: Some(Default::default()),
agent_endpoint: Ok(vec![DEFAULT_AGENT_ENDPOINT.parse().unwrap()]),
max_packet_size: UDP_PACKET_MAX_LENGTH,
auto_split_batch: false,
Expand All @@ -108,6 +110,10 @@ impl HasRequiredConfig for AgentPipeline {
fn set_trace_config(&mut self, config: Config) {
self.trace_config = Some(config)
}

fn set_batch_config(&mut self, config: BatchConfig) {
self.batch_config = Some(config)
}
}

/// Start a new pipeline to configure a exporter that target a jaeger agent.
Expand Down Expand Up @@ -226,6 +232,27 @@ impl AgentPipeline {
self
}

/// Assign the batch span processor for the exporter pipeline.
zengxilong marked this conversation as resolved.
Show resolved Hide resolved
///
/// If a simple span processor is used by [`install_simple`][AgentPipeline::install_simple]
/// or [`build_simple`][AgentPipeline::install_simple], then this config will not be ignored.
///
/// # Examples
/// Set max queue size.
/// ```rust
/// use opentelemetry::sdk::trace::BatchConfig;
///
/// let pipeline = opentelemetry_jaeger::new_agent_pipeline()
/// .with_batch_processor_config(
/// BatchConfig::default().with_max_queue_size(200)
/// );
///
/// ```
pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self {
self.set_batch_config(config);
self
}

/// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
///
/// The exporter will send each span to the agent upon the span ends.
Expand Down Expand Up @@ -273,10 +300,14 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let batch_config = self.batch_config.take();
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime)
.with_batch_config(batch_config.unwrap_or_default())
.build();

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_span_processor(batch_processor);
builder = builder.with_config(config);

Ok(builder.build())
Expand Down
31 changes: 30 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Expand Up @@ -5,6 +5,7 @@ use crate::exporter::config::{
use crate::exporter::uploader::{AsyncUploader, Uploader};
use crate::{Exporter, JaegerTraceRuntime};
use http::Uri;
use opentelemetry::sdk::trace::BatchConfig;
use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError};
use std::borrow::BorrowMut;
use std::convert::TryFrom;
Expand Down Expand Up @@ -91,6 +92,7 @@ const ENV_PASSWORD: &str = "OTEL_EXPORTER_JAEGER_PASSWORD";
pub struct CollectorPipeline {
transformation_config: TransformationConfig,
trace_config: Option<TraceConfig>,
batch_config: Option<BatchConfig>,

#[cfg(feature = "collector_client")]
collector_timeout: Duration,
Expand All @@ -113,6 +115,7 @@ impl Default for CollectorPipeline {
client_config: ClientConfig::default(),
transformation_config: Default::default(),
trace_config: Default::default(),
batch_config: Some(Default::default()),
};

#[cfg(feature = "collector_client")]
Expand Down Expand Up @@ -155,6 +158,10 @@ impl HasRequiredConfig for CollectorPipeline {
fn set_trace_config(&mut self, config: TraceConfig) {
self.trace_config = Some(config)
}

fn set_batch_config(&mut self, config: BatchConfig) {
self.batch_config = Some(config)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -398,6 +405,24 @@ impl CollectorPipeline {
self
}

/// Assign the batch span processor for the exporter pipeline.
///
/// # Examples
/// Set max queue size.
/// ```rust
/// use opentelemetry::sdk::trace::BatchConfig;
///
/// let pipeline = opentelemetry_jaeger::new_collector_pipeline()
/// .with_batch_processor_config(
/// BatchConfig::default().with_max_queue_size(200)
/// );
///
/// ```
pub fn with_batch_processor_config(mut self, config: BatchConfig) -> Self {
self.set_batch_config(config);
self
}

/// Build a `TracerProvider` using a async exporter and configurations from the pipeline.
///
/// The exporter will collect spans in a batch and send them to the agent.
Expand All @@ -423,10 +448,14 @@ impl CollectorPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let batch_config = self.batch_config.take();
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let batch_processor = sdk::trace::BatchSpanProcessor::builder(exporter, runtime)
.with_batch_config(batch_config.unwrap_or_default())
.build();

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_span_processor(batch_processor);
builder = builder.with_config(config);

Ok(builder.build())
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -36,13 +36,15 @@ impl Default for TransformationConfig {
}
}

// pipeline must have transformation config and trace config.
// pipeline must have transformation config, trace config and batch config.
trait HasRequiredConfig {
fn set_transformation_config<T>(&mut self, f: T)
where
T: FnOnce(&mut TransformationConfig);

fn set_trace_config(&mut self, config: sdk::trace::Config);

fn set_batch_config(&mut self, config: sdk::trace::BatchConfig);
}

// To reduce the overhead of copying service name in every spans. We convert resource into jaeger tags
Expand Down
68 changes: 68 additions & 0 deletions opentelemetry-sdk/src/trace/span_processor.rs
Expand Up @@ -554,6 +554,54 @@ impl Default for BatchConfig {
}
}

impl BatchConfig {
zengxilong marked this conversation as resolved.
Show resolved Hide resolved
/// Set max_queue_size for [`BatchConfig`].
/// It's the maximum queue size to buffer spans for delayed processing.
/// If the queue gets full it will drops the spans.
/// The default value of is 2048.
pub fn with_max_queue_size(mut self, max_queue_size: usize) -> Self {
self.max_queue_size = max_queue_size;
self
}

/// Set max_export_batch_size for [`BatchConfig`].
/// It's the maximum number of spans to process in a single batch. If there are
/// more than one batch worth of spans then it processes multiple batches
/// of spans one batch after the other without any delay. The default value
/// is 512.
pub fn with_max_export_batch_size(mut self, max_export_batch_size: usize) -> Self {
self.max_export_batch_size = max_export_batch_size;
self
}

/// Set max_concurrent_exports for [`BatchConfig`].
/// It's the maximum number of concurrent exports.
/// Limits the number of spawned tasks for exports and thus memory consumed by an exporter.
/// The default value is 1.
/// IF the max_concurrent_exports value is default value, it will cause exports to be performed
/// synchronously on the BatchSpanProcessor task.
pub fn with_max_concurrent_exports(mut self, max_concurrent_exports: usize) -> Self {
self.max_concurrent_exports = max_concurrent_exports;
self
}

/// Set scheduled_delay_duration for [`BatchConfig`].
/// It's the delay interval in milliseconds between two consecutive processing of batches.
/// The default value is 5000 milliseconds.
pub fn with_scheduled_delay(mut self, scheduled_delay: Duration) -> Self {
self.scheduled_delay = scheduled_delay;
self
}

/// Set max_export_timeout for [`BatchConfig`].
/// It's the maximum duration to export a batch of data.
/// The The default value is 30000 milliseconds.
pub fn with_max_export_timeout(mut self, max_export_timeout: Duration) -> Self {
self.max_export_timeout = max_export_timeout;
self
}
}

/// A builder for creating [`BatchSpanProcessor`] instances.
///
#[derive(Debug)]
Expand Down Expand Up @@ -616,6 +664,11 @@ where
BatchSpanProcessorBuilder { config, ..self }
}

/// Set the BatchConfig for [BatchSpanProcessorBuilder]
pub fn with_batch_config(self, config: BatchConfig) -> Self {
BatchSpanProcessorBuilder { config, ..self }
}

/// Build a batch processor
pub fn build(self) -> BatchSpanProcessor<R> {
BatchSpanProcessor::new(Box::new(self.exporter), self.config, self.runtime)
Expand Down Expand Up @@ -657,6 +710,21 @@ mod tests {
assert!(rx_shutdown.try_recv().is_ok());
}

#[test]
fn test_batch_config_with_fields() {
let batch = BatchConfig::default()
.with_max_export_batch_size(10)
.with_scheduled_delay(Duration::from_millis(10))
.with_max_export_timeout(Duration::from_millis(10))
.with_max_concurrent_exports(10)
.with_max_queue_size(10);
assert_eq!(batch.max_export_batch_size, 10);
assert_eq!(batch.scheduled_delay, Duration::from_millis(10));
assert_eq!(batch.max_export_timeout, Duration::from_millis(10));
assert_eq!(batch.max_concurrent_exports, 10);
assert_eq!(batch.max_queue_size, 10);
}

#[test]
fn test_build_batch_span_processor_builder() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
Expand Down