Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support concurrent exports #781

Merged
merged 11 commits into from May 17, 2022
1 change: 1 addition & 0 deletions opentelemetry-datadog/Cargo.toml
Expand Up @@ -36,6 +36,7 @@ thiserror = "1.0"
itertools = "0.10"
http = "0.2"
lazy_static = "1.4"
futures-core = "0.3"

[dev-dependencies]
base64 = "0.13"
Expand Down
74 changes: 45 additions & 29 deletions opentelemetry-datadog/src/exporter/mod.rs
Expand Up @@ -9,7 +9,7 @@ use std::borrow::Cow;
use std::fmt::{Debug, Formatter};

use crate::exporter::model::FieldMapping;
use async_trait::async_trait;
use futures_core::future::BoxFuture;
use http::{Method, Request, Uri};
use itertools::Itertools;
use opentelemetry::sdk::export::trace;
Expand All @@ -34,7 +34,7 @@ const DATADOG_TRACE_COUNT_HEADER: &str = "X-Datadog-Trace-Count";

/// Datadog span exporter
pub struct DatadogExporter {
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
request_url: Uri,
model_config: ModelConfig,
version: ApiVersion,
Expand All @@ -49,7 +49,7 @@ impl DatadogExporter {
model_config: ModelConfig,
request_url: Uri,
version: ApiVersion,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
resource_mapping: Option<FieldMapping>,
name_mapping: Option<FieldMapping>,
service_name_mapping: Option<FieldMapping>,
Expand All @@ -64,6 +64,27 @@ impl DatadogExporter {
service_name_mapping,
}
}

fn build_request(&self, batch: Vec<SpanData>) -> Result<http::Request<Vec<u8>>, TraceError> {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(
&self.model_config,
traces,
self.service_name_mapping.clone(),
self.name_mapping.clone(),
self.resource_mapping.clone(),
)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;

Ok(req)
}
}

impl Debug for DatadogExporter {
Expand Down Expand Up @@ -94,8 +115,7 @@ pub struct DatadogPipelineBuilder {
agent_endpoint: String,
trace_config: Option<sdk::trace::Config>,
version: ApiVersion,
client: Option<Box<dyn HttpClient>>,

client: Option<Arc<dyn HttpClient>>,
resource_mapping: Option<FieldMapping>,
name_mapping: Option<FieldMapping>,
service_name_mapping: Option<FieldMapping>,
Expand All @@ -122,15 +142,15 @@ impl Default for DatadogPipelineBuilder {
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "surf-client"),
not(feature = "reqwest-blocking-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
}
}
}
Expand Down Expand Up @@ -296,7 +316,7 @@ impl DatadogPipelineBuilder {
/// Choose the http client used by uploader
pub fn with_http_client<T: HttpClient + 'static>(
mut self,
client: Box<dyn HttpClient>,
client: Arc<dyn HttpClient>,
) -> Self {
self.client = Some(client);
self
Expand Down Expand Up @@ -354,28 +374,24 @@ fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
.collect()
}

#[async_trait]
async fn send_request(
client: Arc<dyn HttpClient>,
request: http::Request<Vec<u8>>,
) -> trace::ExportResult {
let _ = client.send(request).await?.error_for_status()?;
Ok(())
}

impl trace::SpanExporter for DatadogExporter {
/// Export spans to datadog-agent
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
let trace_count = traces.len();
let data = self.version.encode(
&self.model_config,
traces,
self.service_name_mapping.clone(),
self.name_mapping.clone(),
self.resource_mapping.clone(),
)?;
let req = Request::builder()
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.header(DATADOG_TRACE_COUNT_HEADER, trace_count)
.body(data)
.map_err::<Error, _>(Into::into)?;
let _ = self.client.send(req).await?.error_for_status()?;
Ok(())
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let request = match self.build_request(batch) {
Ok(req) => req,
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let client = self.client.clone();
Box::pin(send_request(client, request))
}
}

Expand Down
15 changes: 15 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -22,7 +22,9 @@ rustdoc-args = ["--cfg", "docsrs"]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
futures-executor = "0.3"
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
Expand All @@ -45,6 +47,7 @@ prost = { version = "0.9.0", optional = true }
prost-types = { version = "0.9.0", optional = true }

[dev-dependencies]
tokio = { version = "1.0", features = ["net", "sync"] }
bytes = "1"
futures-executor = "0.3"
opentelemetry = { default-features = false, features = ["trace", "testing"], path = "../opentelemetry" }
Expand All @@ -63,6 +66,18 @@ features = [
optional = true

[features]
full = [
"collector_client",
"isahc_collector_client",
"reqwest_collector_client",
"reqwest_blocking_collector_client",
"surf_collector_client",
"wasm_collector_client",
"rt-tokio",
"rt-tokio-current-thread",
"rt-async-std",
"integration_test"
]
default = []
collector_client = ["http", "opentelemetry-http"]
isahc_collector_client = ["isahc", "opentelemetry-http/isahc"]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -104,8 +104,8 @@ mod tests {
assert_eq!(process.tags.len(), 2);
}

#[test]
fn test_read_from_env() {
#[tokio::test]
async fn test_read_from_env() {
// OTEL_SERVICE_NAME env var also works
env::set_var("OTEL_SERVICE_NAME", "test service");
let builder = new_agent_pipeline();
Expand Down
107 changes: 88 additions & 19 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -18,7 +18,9 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;

#[cfg(feature = "isahc_collector_client")]
Expand All @@ -42,13 +44,27 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";

#[derive(Debug)]
enum ExportMessage {
Export {
batch: Vec<trace::SpanData>,
tx: oneshot::Sender<trace::ExportResult>,
},
Shutdown,
}

/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
tx: mpsc::Sender<ExportMessage>,

// In the switch to concurrent exports, the non-test code which used this
// value was moved into the ExporterTask implementation. However, there's
// still a test that relies on this value being here, thus the
// allow(dead_code).
#[allow(dead_code)]
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Exporter {
Expand All @@ -57,10 +73,61 @@ impl Exporter {
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> Exporter {
Exporter {
process,
let (tx, rx) = futures::channel::mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process: process.clone(),
};

let join_handle = Some(std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
}));

Exporter {
tx,
process,
join_handle,
}
}
}

struct ExporterTask {
rx: mpsc::Receiver<ExportMessage>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
while let Some(message) = self.rx.next().await {
match message {
ExportMessage::Export { batch, tx } => {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// Errors here might be completely expected if the receiver didn't
// care about the result.
let _ = tx.send(res);
}
ExportMessage::Shutdown => break,
}
}
}
}
Expand All @@ -74,23 +141,25 @@ pub struct Process {
pub tags: Vec<KeyValue>,
}

#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
}

self.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
Box::pin(async move { rx.await? })
}

fn shutdown(&mut self) {
let _ = self.tx.try_send(ExportMessage::Shutdown);

// This has the potential to block indefinitely, but as long as all of
// the tasks processed by ExportTask have a timeout, this should join
// eventually.
self.join_handle.take().map(|handle| handle.join());
}
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-jaeger/src/lib.rs
Expand Up @@ -25,7 +25,8 @@
//! use opentelemetry::trace::Tracer;
//! use opentelemetry::global;
//!
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! #[tokio::main]
//! async fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
//!
Expand Down
11 changes: 6 additions & 5 deletions opentelemetry-otlp/src/exporter/http.rs
@@ -1,6 +1,7 @@
use crate::{ExportConfig, Protocol};
use opentelemetry_http::HttpClient;
use std::collections::HashMap;
use std::sync::Arc;

/// Configuration of the http transport
#[cfg(feature = "http-proto")]
Expand All @@ -15,7 +16,7 @@ use std::collections::HashMap;
)]
pub struct HttpConfig {
/// Select the HTTP client
pub client: Option<Box<dyn HttpClient>>,
pub client: Option<Arc<dyn HttpClient>>,

/// Additional headers to send to the collector.
pub headers: Option<HashMap<String, String>>,
Expand All @@ -30,19 +31,19 @@ impl Default for HttpConfig {
fn default() -> Self {
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
client: Some(Arc::new(reqwest::blocking::Client::new())),
#[cfg(all(
not(feature = "reqwest-blocking-client"),
not(feature = "surf-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
client: Some(Arc::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
client: Some(Arc::new(surf::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "surf-client"),
Expand Down Expand Up @@ -78,7 +79,7 @@ impl Default for HttpExporterBuilder {
impl HttpExporterBuilder {
/// Assign client implementation
pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
self.http_config.client = Some(Box::new(client));
self.http_config.client = Some(Arc::new(client));
self
}

Expand Down