Skip to content

Commit

Permalink
fix(jaeger): reqwest client runs inside a non-tokio runtime (#829)
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp committed Jul 4, 2022
1 parent 703059a commit 20e46ee
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 21 deletions.
16 changes: 11 additions & 5 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let exporter = Exporter::new(
let exporter = Exporter::new_sync(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
Expand Down Expand Up @@ -273,7 +273,12 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let exporter = Exporter::new_async(
process.into(),
export_instrument_library,
runtime.clone(),
uploader,
);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand Down Expand Up @@ -317,10 +322,11 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
let uploader = self.build_async_agent_uploader(runtime.clone())?;
Ok(Exporter::new_async(
process.into(),
export_instrument_library,
runtime,
uploader,
))
}
Expand All @@ -331,7 +337,7 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Ok(Exporter::new(
Ok(Exporter::new_sync(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,12 @@ impl CollectorPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let exporter = Exporter::new_async(
process.into(),
export_instrument_library,
runtime.clone(),
uploader,
);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand Down
43 changes: 28 additions & 15 deletions opentelemetry-jaeger/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,38 @@ pub struct Exporter {
// allow(dead_code).
#[allow(dead_code)]
process: jaeger::Process,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Exporter {
fn new(
fn new_async<R>(
process: jaeger::Process,
export_instrumentation_lib: bool,
runtime: R,
uploader: Box<dyn Uploader>,
) -> Exporter
where
R: JaegerTraceRuntime,
{
let (tx, rx) = mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process: process.clone(),
};

runtime.spawn(Box::pin(exporter_task.run()));

Exporter { tx, process }
}

fn new_sync(
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> Exporter {
let (tx, rx) = futures::channel::mpsc::channel(64);
let (tx, rx) = mpsc::channel(64);

let exporter_task = ExporterTask {
rx,
Expand All @@ -82,15 +104,11 @@ impl Exporter {
process: process.clone(),
};

let join_handle = Some(std::thread::spawn(move || {
std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
}));
});

Exporter {
tx,
process,
join_handle,
}
Exporter { tx, process }
}
}

Expand Down Expand Up @@ -155,11 +173,6 @@ impl trace::SpanExporter for Exporter {

fn shutdown(&mut self) {
let _ = self.tx.try_send(ExportMessage::Shutdown);

// This has the potential to block indefinitely, but as long as all of
// the tasks processed by ExportTask have a timeout, this should join
// eventually.
self.join_handle.take().map(|handle| handle.join());
}
}

Expand Down

0 comments on commit 20e46ee

Please sign in to comment.