Skip to content

Commit

Permalink
Fix remaining TODOs
Browse files Browse the repository at this point in the history
This finishes the remaining TODOs on the concurrent-exports branch. The
major change included here adds shutdown functionality to the jaeger
exporter which ensures the exporter has finished its tasks before
exiting.
  • Loading branch information
jwilm committed May 6, 2022
1 parent b4e72bc commit 24f81b7
Showing 1 changed file with 58 additions and 27 deletions.
85 changes: 58 additions & 27 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -44,14 +44,27 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name";
/// Instrument Library version MUST be reported in Jaeger Span tags with the following key
const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";

#[derive(Debug)]
enum ExportMessage {
Export {
batch: Vec<trace::SpanData>,
tx: oneshot::Sender<trace::ExportResult>,
},
Shutdown,
}

/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
tx: mpsc::Sender<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
// TODO jwilm: this is used in an existing test to check that the service
// name is properly read from the environment.
tx: mpsc::Sender<ExportMessage>,

// In the switch to concurrent exports, the non-test code which used this
// value was moved into the ExporterTask implementation. However, there's
// still a test that relies on this value being here, thus the
// allow(dead_code).
#[allow(dead_code)]
process: jaeger::Process,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Exporter {
Expand All @@ -68,16 +81,21 @@ impl Exporter {
uploader,
process: process.clone(),
};
std::thread::spawn(move || {

let join_handle = Some(std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
});
}));

Exporter { tx, process }
Exporter {
tx,
process,
join_handle,
}
}
}

struct ExporterTask {
rx: mpsc::Receiver<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
rx: mpsc::Receiver<ExportMessage>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
Expand All @@ -86,26 +104,30 @@ struct ExporterTask {

impl ExporterTask {
async fn run(mut self) {
// TODO jwilm: this might benefit from a ExporterMessage so that we can
// send Shutdown and break the loop.
while let Some((batch, tx)) = self.rx.next().await {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
while let Some(message) = self.rx.next().await {
match message {
ExportMessage::Export { batch, tx } => {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// Errors here might be completely expected if the receiver didn't
// care about the result.
let _ = tx.send(res);
}
ExportMessage::Shutdown => break,
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// TODO jwilm: is ignoring the err (fail to send) correct here?
let _ = tx.send(res);
}
}
}
Expand All @@ -124,12 +146,21 @@ impl trace::SpanExporter for Exporter {
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send((batch, tx)) {
if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
}

Box::pin(async move { rx.await? })
}

fn shutdown(&mut self) {
let _ = self.tx.try_send(ExportMessage::Shutdown);

// This has the potential to block indefinitely, but as long as all of
// the tasks processed by ExportTask have a timeout, this should join
// eventually.
self.join_handle.take().map(|handle| handle.join());
}
}

fn links_to_references(links: sdk::trace::EvictedQueue<Link>) -> Option<Vec<jaeger::SpanRef>> {
Expand Down

0 comments on commit 24f81b7

Please sign in to comment.