Skip to content

Commit

Permalink
feat(jaeger): remove internal message queue between exporter and expo…
Browse files Browse the repository at this point in the history
…rting tasks (#848)
  • Loading branch information
TommyCpp committed Jul 25, 2022
1 parent f20c9b4 commit d65d545
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 136 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-api/Cargo.toml
Expand Up @@ -10,7 +10,7 @@ futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] }
indexmap = "=1.8"
once_cell = "1.12.0"
pin-project-lite = { version = "0.2.9", optional = true }
pin-project-lite = { version = "0.2", optional = true }
thiserror = "1"
tokio-stream = { version = "0.1", optional = true }

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -33,11 +33,11 @@ once_cell = "1.12"
opentelemetry = { version = "0.17", default-features = false, features = ["trace"], path = "../opentelemetry" }
opentelemetry-http = { version = "0.6", path = "../opentelemetry-http", optional = true }
opentelemetry-semantic-conventions = { version = "0.9", path = "../opentelemetry-semantic-conventions" }
pin-project-lite = { version = "0.2.9", optional = true }
pin-project-lite = { version = "0.2", optional = true }
reqwest = { version = "0.11", default-features = false, optional = true }
surf = { version = "2.0", optional = true }
thiserror = "1.0"
thrift = "0.15"
thrift = "0.16"
tokio = { version = "1.0", features = ["net", "sync"], optional = true }
wasm-bindgen = { version = "0.2", optional = true }
wasm-bindgen-futures = { version = "0.4.18", optional = true }
Expand Down
27 changes: 12 additions & 15 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Expand Up @@ -9,6 +9,7 @@ use opentelemetry::sdk;
use opentelemetry::sdk::trace::{Config, TracerProvider};
use opentelemetry::trace::TraceError;
use std::borrow::BorrowMut;
use std::sync::Arc;
use std::{env, net};

/// The max size of UDP packet we want to send, synced with jaeger-agent
Expand Down Expand Up @@ -235,7 +236,7 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let exporter = Exporter::new_sync(
let exporter = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
Expand Down Expand Up @@ -273,12 +274,7 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new_async(
process.into(),
export_instrument_library,
runtime.clone(),
uploader,
);
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand Down Expand Up @@ -322,11 +318,10 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
Ok(Exporter::new_async(
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
process.into(),
export_instrument_library,
runtime,
uploader,
))
}
Expand All @@ -337,14 +332,14 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Ok(Exporter::new_sync(
Ok(Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
))
}

fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Box<dyn Uploader>, TraceError>
fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Arc<dyn Uploader>, TraceError>
where
R: JaegerTraceRuntime,
{
Expand All @@ -355,17 +350,19 @@ impl AgentPipeline {
self.auto_split_batch,
)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::Agent(agent)))
Ok(Arc::new(AsyncUploader::Agent(futures::lock::Mutex::new(
agent,
))))
}

fn build_sync_agent_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
fn build_sync_agent_uploader(self) -> Result<Arc<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
self.max_packet_size,
self.auto_split_batch,
)
.map_err::<Error, _>(Into::into)?;
Ok(Box::new(SyncUploader::Agent(agent)))
Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent))))
}
}

Expand Down
Expand Up @@ -187,7 +187,7 @@ mod collector_client_tests {
.with_endpoint("localhost:6831")
.with_http_client(test_http_client::TestHttpClient);
let (_, process) = build_config_and_process(None, None);
let mut uploader = invalid_uri_builder.build_uploader::<Tokio>()?;
let uploader = invalid_uri_builder.build_uploader::<Tokio>()?;
let res = futures_executor::block_on(async {
uploader
.upload(Batch::new(process.into(), Vec::new()))
Expand Down
14 changes: 5 additions & 9 deletions opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Expand Up @@ -9,6 +9,7 @@ use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError};
use std::borrow::BorrowMut;
use std::convert::TryFrom;
use std::env;
use std::sync::Arc;
#[cfg(feature = "collector_client")]
use std::time::Duration;

Expand Down Expand Up @@ -411,12 +412,7 @@ impl CollectorPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new_async(
process.into(),
export_instrument_library,
runtime.clone(),
uploader,
);
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand All @@ -436,7 +432,7 @@ impl CollectorPipeline {
install_tracer_provider_and_get_tracer(tracer_provider)
}

fn build_uploader<R>(self) -> Result<Box<dyn Uploader>, crate::Error>
fn build_uploader<R>(self) -> Result<Arc<dyn Uploader>, crate::Error>
where
R: JaegerTraceRuntime,
{
Expand All @@ -461,14 +457,14 @@ impl CollectorPipeline {
)?;

let collector = AsyncHttpClient::new(endpoint, client);
Ok(Box::new(AsyncUploader::<R>::Collector(collector)))
Ok(Arc::new(AsyncUploader::<R>::Collector(collector)))
}
#[cfg(feature = "wasm_collector_client")]
ClientConfig::Wasm => {
let collector =
WasmCollector::new(endpoint, self.collector_username, self.collector_password)
.map_err::<crate::Error, _>(Into::into)?;
Ok(Box::new(AsyncUploader::<R>::WasmCollector(collector)))
Ok(Arc::new(AsyncUploader::<R>::WasmCollector(collector)))
}
}
}
Expand Down
124 changes: 22 additions & 102 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -18,10 +18,9 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;
use std::sync::Arc;

#[cfg(feature = "isahc_collector_client")]
#[allow(unused_imports)] // this is actually used to configure authentication
Expand All @@ -44,108 +43,25 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";

#[derive(Debug)]
enum ExportMessage {
Export {
batch: Vec<trace::SpanData>,
tx: oneshot::Sender<trace::ExportResult>,
},
Shutdown,
}

/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
tx: mpsc::Sender<ExportMessage>,

// In the switch to concurrent exports, the non-test code which used this
// value was moved into the ExporterTask implementation. However, there's
// still a test that relies on this value being here, thus the
// allow(dead_code).
#[allow(dead_code)]
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Arc<dyn Uploader>,
process: jaeger::Process,
}

impl Exporter {
fn new_async<R>(
process: jaeger::Process,
export_instrumentation_lib: bool,
runtime: R,
uploader: Box<dyn Uploader>,
) -> Exporter
where
R: JaegerTraceRuntime,
{
let (tx, rx) = mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process: process.clone(),
};

runtime.spawn(Box::pin(exporter_task.run()));

Exporter { tx, process }
}

fn new_sync(
fn new(
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
uploader: Arc<dyn Uploader>,
) -> Exporter {
let (tx, rx) = mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
Exporter {
export_instrumentation_lib,
uploader,
process: process.clone(),
};

std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
});

Exporter { tx, process }
}
}

struct ExporterTask {
rx: mpsc::Receiver<ExportMessage>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
while let Some(message) = self.rx.next().await {
match message {
ExportMessage::Export { batch, tx } => {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// Errors here might be completely expected if the receiver didn't
// care about the result.
let _ = tx.send(res);
}
ExportMessage::Shutdown => break,
}
process,
}
}
}
Expand All @@ -160,19 +76,23 @@ pub struct Process {
}

impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

Box::pin(async move { rx.await? })
}

fn shutdown(&mut self) {
let _ = self.tx.try_send(ExportMessage::Shutdown);
let uploader = self.uploader.clone();
Box::pin(async move {
uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
})
}
}

Expand Down
16 changes: 10 additions & 6 deletions opentelemetry-jaeger/src/exporter/uploader.rs
Expand Up @@ -11,22 +11,24 @@ use crate::exporter::thrift::jaeger::Batch;
use crate::exporter::JaegerTraceRuntime;

#[async_trait]
pub(crate) trait Uploader: std::fmt::Debug + Send {
async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult;
pub(crate) trait Uploader: Debug + Send + Sync {
async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult;
}

#[derive(Debug)]
pub(crate) enum SyncUploader {
Agent(agent::AgentSyncClientUdp),
Agent(std::sync::Mutex<agent::AgentSyncClientUdp>),
}

#[async_trait]
impl Uploader for SyncUploader {
async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult {
async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult {
match self {
SyncUploader::Agent(client) => {
// TODO Implement retry behaviour
client
.lock()
.expect("Failed to lock agent client")
.emit_batch(batch)
.map_err::<crate::Error, _>(Into::into)?;
}
Expand All @@ -39,7 +41,7 @@ impl Uploader for SyncUploader {
#[derive(Debug)]
pub(crate) enum AsyncUploader<R: JaegerTraceRuntime> {
/// Agent async client
Agent(agent::AgentAsyncClientUdp<R>),
Agent(futures::lock::Mutex<agent::AgentAsyncClientUdp<R>>),
/// Collector sync client
#[cfg(feature = "collector_client")]
Collector(collector::AsyncHttpClient),
Expand All @@ -49,11 +51,13 @@ pub(crate) enum AsyncUploader<R: JaegerTraceRuntime> {

#[async_trait]
impl<R: JaegerTraceRuntime> Uploader for AsyncUploader<R> {
async fn upload(&mut self, batch: Batch) -> ExportResult {
async fn upload(&self, batch: Batch) -> ExportResult {
match self {
Self::Agent(client) => {
// TODO Implement retry behaviour
client
.lock()
.await
.emit_batch(batch)
.await
.map_err::<crate::Error, _>(Into::into)?;
Expand Down

0 comments on commit d65d545

Please sign in to comment.