Skip to content

Commit

Permalink
Support concurrent exports (#781)
Browse files Browse the repository at this point in the history
* Add support for concurrent exports

Applications generating significant span volume can end up dropping data
due to the synchronous export step. According to the opentelemetry spec,

    This function will never be called concurrently for the same exporter
    instance. It can be called again only after the current call returns.

However, it does not place a restriction on concurrent I/O or anything
of that nature. There is an [ongoing discussion] about tweaking the
language to make this more clear.

With that in mind, this commit makes the exporters return a future that
can be spawned concurrently. Unfortunately, this means that the
`export()` method can no longer be async while taking &mut self. The
latter is desirable to enforce the no concurrent calls line of the spec,
so the choice is made here to return a future instead with the lifetime
decoupled from self. This resulted in a bit of additional verbosity, but
for the most part the async code can still be shoved into an async fn
for the ergonomics.

The main exception to this is the `jaeger` exporter which internally
requires a bunch of mutable references. I plan to discuss with the
opentelemetry team the overall goal of this PR and get buy-in before
making more invasive changes to support this in the jaeger exporter.

[ongoing discussion]: open-telemetry/opentelemetry-specification#2434

* SpanProcessor directly manages concurrent exports

Prior, export tasks were run in "fire and forget" mode with
runtime::spawn. SpanProcessor now manages tasks directly using
FuturesUnordered. This enables limiting overall concurrency (and thus
memory footprint). Additionally, flush and shutdown logic now spawn an
additional task for any unexported spans and wait on _all_ outstanding
tasks to complete before returning.

* Add configuration for BSP max_concurrent_exports

Users may desire to control the level of export concurrency in the batch
span processor. There are two special values:

    max_concurrent_exports = 0: no bound on concurrency
    max_concurrent_exports = 1: no concurrency, makes everything
    synchronous on the messaging task.

* Implement new SpanExporter API for Jaeger

Key points
- decouple exporter from uploaders via channel and spawned task
- some uploaders are a shared I/O resource and cannot be multiplexed
    - necessitates a task queue
    - eg, HttpClient will spawn many I/O tasks internally, AgentUploader
      is a single I/O resource. Different level of abstraction.
- Synchronous API not supported without a Runtime argument. I updated
  the API to thread one through, but maybe this is undesirable. I'm also
  exploiting the fact in the Actix examples that it uses Tokio under the
  hood to pass through the Tokio runtime token.
- Tests pass save for a couple of flakey environment ones which is
  likely a race condition.

* Reduce dependencies on futures

The minimal necessary futures library (core, util, futures proper) is
now used in all packages touched by the concurrent exporters work.

* Remove runtime from Jaeger's install_simple

To keep the API _actually_ simple, we now leverage a thread to run the
jaeger exporter internals.

* Add Arc lost in a rebase

* Fix OTEL_BSP_MAX_CONCURRENT_EXPORTS name and value

Per PR feedback, the default should match the previous behavior of 1
batch at a time.

* Fix remaining TODOs

This finishes the remaining TODOs on the concurrent-exports branch. The
major change included here adds shutdown functionality to the jaeger
exporter which ensures the exporter has finished its tasks before
exiting.

* Restore lint.sh script

This was erroneously committed.

* Make max concurrent exports env configurable

OTEL_BSP_MAX_CONCURRENT_EXPORTS may now be specified in the environment
to configure the number of max concurrent exports. This configurable now
has parity with the other options of the span_processor.
  • Loading branch information
jwilm committed May 17, 2022
1 parent 02e15b2 commit 7534891
Show file tree
Hide file tree
Showing 20 changed files with 523 additions and 258 deletions.
1 change: 1 addition & 0 deletions opentelemetry-datadog/Cargo.toml
Expand Up @@ -36,6 +36,7 @@ thiserror = "1.0"
itertools = "0.10"
http = "0.2"
lazy_static = "1.4"
futures-core = "0.3"

[dev-dependencies]
base64 = "0.13"
Expand Down
74 changes: 45 additions & 29 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -9,7 +9,7 @@ use std::borrow::Cow;
use std::fmt::{Debug, Formatter};

use crate::exporter::model::FieldMapping;
use async_trait::async_trait;
use futures_core::future::BoxFuture;
use http::{Method, Request, Uri};
use itertools::Itertools;
use opentelemetry::sdk::export::trace;
Expand All @@ -34,7 +34,7 @@ const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";

/// Datadog span exporter
pub struct DatadogExporter {
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
request_url: Uri,
model_config: ModelConfig,
version: ApiVersion,
Expand All @@ -49,7 +49,7 @@ impl DatadogExporter {
model_config: ModelConfig,
request_url: Uri,
version: ApiVersion,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
resource_mapping: Option<FieldMapping>,
name_mapping: Option<FieldMapping>,
service_name_mapping: Option<FieldMapping>,
Expand All @@ -64,6 +64,27 @@ impl DatadogExporter {
service_name_mapping,
}
}

fn build_request(&self, batch: Vec<SpanData>) -> Result<http::Request<Vec<u8>>, TraceError> {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(
&self.model_config,
traces,
self.service_name_mapping.clone(),
self.name_mapping.clone(),
self.resource_mapping.clone(),
)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;

Ok(req)
}
}

impl Debug for DatadogExporter {
Expand Down Expand Up @@ -94,8 +115,7 @@ pub struct DatadogPipelineBuilder {
agent_endpoint: String,
trace_config: Option<sdk::trace::Config>,
version: ApiVersion,
client: Option<Box<dyn HttpClient>>,

client: Option<Arc<dyn HttpClient>>,
resource_mapping: Option<FieldMapping>,
name_mapping: Option<FieldMapping>,
service_name_mapping: Option<FieldMapping>,
Expand All @@ -122,15 +142,15 @@ impl Default for DatadogPipelineBuilder {
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "surf-client"),
not(feature = "reqwest-blocking-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
}
}
}
Expand Down Expand Up @@ -296,7 +316,7 @@ impl DatadogPipelineBuilder {
/// Choose the http client used by uploader
pub fn with_http_client<T: HttpClient + 'static>(
mut self,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
) -> Self {
self.client = Some(client);
self
Expand Down Expand Up @@ -354,28 +374,24 @@ fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
.collect()
}

#[async_trait]
async fn send_request(
client: Arc<dyn HttpClient>,
request: http::Request<Vec<u8>>,
) -> trace::ExportResult {
let _ = client.send(request).await?.error_for_status()?;
Ok(())
}

impl trace::SpanExporter for DatadogExporter {
/// Export spans to datadog-agent
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(
&self.model_config,
traces,
self.service_name_mapping.clone(),
self.name_mapping.clone(),
self.resource_mapping.clone(),
)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
Ok(())
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let request = match self.build_request(batch) {
Ok(req) => req,
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let client = self.client.clone();
Box::pin(send_request(client, request))
}
}

Expand Down
15 changes: 15 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -22,7 +22,9 @@ rustdoc-args = ["--cfg", "docsrs"]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
futures-executor = "0.3"
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
Expand All @@ -45,6 +47,7 @@ prost = { version = "0.9.0", optional = true }
prost-types = { version = "0.9.0", optional = true }

[dev-dependencies]
tokio = { version = "1.0", features = ["net", "sync"] }
bytes = "1"
futures-executor = "0.3"
opentelemetry = { default-features = false, features = ["trace", "testing"], path = "../opentelemetry" }
Expand All @@ -63,6 +66,18 @@ features = [
optional = true

[features]
full = [
"collector_client",
"isahc_collector_client",
"reqwest_collector_client",
"reqwest_blocking_collector_client",
"surf_collector_client",
"wasm_collector_client",
"rt-tokio",
"rt-tokio-current-thread",
"rt-async-std",
"integration_test"
]
default = []
collector_client = ["http", "opentelemetry-http"]
isahc_collector_client = ["isahc", "opentelemetry-http/isahc"]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -104,8 +104,8 @@ mod tests {
assert_eq!(process.tags.len(), 2);
}

#[test]
fn test_read_from_env() {
#[tokio::test]
async fn test_read_from_env() {
// OTEL_SERVICE_NAME env var also works
env::set_var("OTEL_SERVICE_NAME", "test service");
let builder = new_agent_pipeline();
Expand Down
107 changes: 88 additions & 19 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -18,7 +18,9 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;

#[cfg(feature = "isahc_collector_client")]
Expand All @@ -42,13 +44,27 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";

#[derive(Debug)]
enum ExportMessage {
Export {
batch: Vec<trace::SpanData>,
tx: oneshot::Sender<trace::ExportResult>,
},
Shutdown,
}

/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
tx: mpsc::Sender<ExportMessage>,

// In the switch to concurrent exports, the non-test code which used this
// value was moved into the ExporterTask implementation. However, there's
// still a test that relies on this value being here, thus the
// allow(dead_code).
#[allow(dead_code)]
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Exporter {
Expand All @@ -57,10 +73,61 @@ impl Exporter {
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> Exporter {
Exporter {
process,
let (tx, rx) = futures::channel::mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process: process.clone(),
};

let join_handle = Some(std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
}));

Exporter {
tx,
process,
join_handle,
}
}
}

struct ExporterTask {
rx: mpsc::Receiver<ExportMessage>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
while let Some(message) = self.rx.next().await {
match message {
ExportMessage::Export { batch, tx } => {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// Errors here might be completely expected if the receiver didn't
// care about the result.
let _ = tx.send(res);
}
ExportMessage::Shutdown => break,
}
}
}
}
Expand All @@ -74,23 +141,25 @@ pub struct Process {
pub tags: Vec<KeyValue>,
}

#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
}

self.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
Box::pin(async move { rx.await? })
}

fn shutdown(&mut self) {
let _ = self.tx.try_send(ExportMessage::Shutdown);

// This has the potential to block indefinitely, but as long as all of
// the tasks processed by ExportTask have a timeout, this should join
// eventually.
self.join_handle.take().map(|handle| handle.join());
}
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-jaeger/src/lib.rs
Expand Up @@ -25,7 +25,8 @@
//! use opentelemetry::trace::Tracer;
//! use opentelemetry::global;
//!
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! #[tokio::main]
//! async fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
//!
Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-otlp/src/exporter/http.rs
@@ -1,6 +1,7 @@
use crate::{ExportConfig, Protocol};
use opentelemetry_http::HttpClient;
use std::collections::HashMap;
use std::sync::Arc;

/// Configuration of the http transport
#[cfg(feature = "http-proto")]
Expand All @@ -15,7 +16,7 @@ use std::collections::HashMap;
)]
pub struct HttpConfig {
/// Select the HTTP client
pub client: Option<Box<dyn HttpClient>>,
pub client: Option<Arc<dyn HttpClient>>,

/// Additional headers to send to the collector.
pub headers: Option<HashMap<String, String>>,
Expand All @@ -30,19 +31,19 @@ impl Default for HttpConfig {
fn default() -> Self {
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
#[cfg(all(
not(feature = "reqwest-blocking-client"),
not(feature = "surf-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "surf-client"),
Expand Down Expand Up @@ -78,7 +79,7 @@ impl Default for HttpExporterBuilder {
impl HttpExporterBuilder {
/// Assign client implementation
pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
self.http_config.client = Some(Box::new(client));
self.http_config.client = Some(Arc::new(client));
self
}

Expand Down

0 comments on commit 7534891

Please sign in to comment.