diff --git a/opentelemetry-api/Cargo.toml b/opentelemetry-api/Cargo.toml index bbed1957da..bf26664ae9 100644 --- a/opentelemetry-api/Cargo.toml +++ b/opentelemetry-api/Cargo.toml @@ -10,7 +10,7 @@ futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std", "sink"] } indexmap = "=1.8" once_cell = "1.12.0" -pin-project-lite = { version = "0.2.9", optional = true } +pin-project-lite = { version = "0.2", optional = true } thiserror = "1" tokio-stream = { version = "0.1", optional = true } diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 787dd9730f..d75d4555af 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -33,11 +33,11 @@ once_cell = "1.12" opentelemetry = { version = "0.17", default-features = false, features = ["trace"], path = "../opentelemetry" } opentelemetry-http = { version = "0.6", path = "../opentelemetry-http", optional = true } opentelemetry-semantic-conventions = { version = "0.9", path = "../opentelemetry-semantic-conventions" } -pin-project-lite = { version = "0.2.9", optional = true } +pin-project-lite = { version = "0.2", optional = true } reqwest = { version = "0.11", default-features = false, optional = true } surf = { version = "2.0", optional = true } thiserror = "1.0" -thrift = "0.15" +thrift = "0.16" tokio = { version = "1.0", features = ["net", "sync"], optional = true } wasm-bindgen = { version = "0.2", optional = true } wasm-bindgen-futures = { version = "0.4.18", optional = true } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index 22e51afb56..2b20f403df 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -9,6 +9,7 @@ use opentelemetry::sdk; use opentelemetry::sdk::trace::{Config, TracerProvider}; use opentelemetry::trace::TraceError; use std::borrow::BorrowMut; +use std::sync::Arc; use std::{env, net}; /// The max size of UDP packet we want to send, synced with jaeger-agent @@ -235,7 +236,7 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let exporter = Exporter::new_sync( + let exporter = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, @@ -273,12 +274,7 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let exporter = Exporter::new_async( - process.into(), - export_instrument_library, - runtime.clone(), - uploader, - ); + let exporter = Exporter::new(process.into(), export_instrument_library, uploader); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -322,11 +318,10 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime.clone())?; - Ok(Exporter::new_async( + let uploader = self.build_async_agent_uploader(runtime)?; + Ok(Exporter::new( process.into(), export_instrument_library, - runtime, uploader, )) } @@ -337,14 +332,14 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - Ok(Exporter::new_sync( + Ok(Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, )) } - fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> + fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> where R: JaegerTraceRuntime, { @@ -355,17 +350,19 @@ impl AgentPipeline { self.auto_split_batch, ) .map_err::(Into::into)?; - Ok(Box::new(AsyncUploader::Agent(agent))) + Ok(Arc::new(AsyncUploader::Agent(futures::lock::Mutex::new( + agent, + )))) } - fn build_sync_agent_uploader(self) -> Result, TraceError> { + fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), self.max_packet_size, self.auto_split_batch, ) .map_err::(Into::into)?; - Ok(Box::new(SyncUploader::Agent(agent))) + Ok(Arc::new(SyncUploader::Agent(std::sync::Mutex::new(agent)))) } } diff --git a/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs b/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs index eaf8d989e9..a7319ccc8f 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/http_client.rs @@ -187,7 +187,7 @@ mod collector_client_tests { .with_endpoint("localhost:6831") .with_http_client(test_http_client::TestHttpClient); let (_, process) = build_config_and_process(None, None); - let mut uploader = invalid_uri_builder.build_uploader::()?; + let uploader = invalid_uri_builder.build_uploader::()?; let res = futures_executor::block_on(async { uploader .upload(Batch::new(process.into(), Vec::new())) diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 3b1c517058..56d2264a29 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -9,6 +9,7 @@ use opentelemetry::{sdk, sdk::trace::Config as TraceConfig, trace::TraceError}; use std::borrow::BorrowMut; use std::convert::TryFrom; use std::env; +use std::sync::Arc; #[cfg(feature = "collector_client")] use std::time::Duration; @@ -411,12 +412,7 @@ impl CollectorPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_uploader::()?; - let exporter = Exporter::new_async( - process.into(), - export_instrument_library, - runtime.clone(), - uploader, - ); + let exporter = Exporter::new(process.into(), export_instrument_library, uploader); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -436,7 +432,7 @@ impl CollectorPipeline { install_tracer_provider_and_get_tracer(tracer_provider) } - fn build_uploader(self) -> Result, crate::Error> + fn build_uploader(self) -> Result, crate::Error> where R: JaegerTraceRuntime, { @@ -461,14 +457,14 @@ impl CollectorPipeline { )?; let collector = AsyncHttpClient::new(endpoint, client); - Ok(Box::new(AsyncUploader::::Collector(collector))) + Ok(Arc::new(AsyncUploader::::Collector(collector))) } #[cfg(feature = "wasm_collector_client")] ClientConfig::Wasm => { let collector = WasmCollector::new(endpoint, self.collector_username, self.collector_password) .map_err::(Into::into)?; - Ok(Box::new(AsyncUploader::::WasmCollector(collector))) + Ok(Arc::new(AsyncUploader::::WasmCollector(collector))) } } } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 2a9c16f56a..e7e6c852b9 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -18,10 +18,9 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use futures::channel::{mpsc, oneshot}; use futures::future::BoxFuture; -use futures::StreamExt; use std::convert::TryInto; +use std::sync::Arc; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -44,108 +43,25 @@ const INSTRUMENTATION_LIBRARY_NAME: &str = "otel.library.name"; /// Instrument Library version MUST be reported in Jaeger Span tags with the following key const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; -#[derive(Debug)] -enum ExportMessage { - Export { - batch: Vec, - tx: oneshot::Sender, - }, - Shutdown, -} - /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { - tx: mpsc::Sender, - - // In the switch to concurrent exports, the non-test code which used this - // value was moved into the ExporterTask implementation. However, there's - // still a test that relies on this value being here, thus the - // allow(dead_code). - #[allow(dead_code)] + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: Arc, process: jaeger::Process, } impl Exporter { - fn new_async( - process: jaeger::Process, - export_instrumentation_lib: bool, - runtime: R, - uploader: Box, - ) -> Exporter - where - R: JaegerTraceRuntime, - { - let (tx, rx) = mpsc::channel(64); - - let exporter_task = ExporterTask { - rx, - export_instrumentation_lib, - uploader, - process: process.clone(), - }; - - runtime.spawn(Box::pin(exporter_task.run())); - - Exporter { tx, process } - } - - fn new_sync( + fn new( process: jaeger::Process, export_instrumentation_lib: bool, - uploader: Box, + uploader: Arc, ) -> Exporter { - let (tx, rx) = mpsc::channel(64); - - let exporter_task = ExporterTask { - rx, + Exporter { export_instrumentation_lib, uploader, - process: process.clone(), - }; - - std::thread::spawn(move || { - futures_executor::block_on(exporter_task.run()); - }); - - Exporter { tx, process } - } -} - -struct ExporterTask { - rx: mpsc::Receiver, - process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: Box, -} - -impl ExporterTask { - async fn run(mut self) { - while let Some(message) = self.rx.next().await { - match message { - ExportMessage::Export { batch, tx } => { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); - } - - let res = self - .uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await; - - // Errors here might be completely expected if the receiver didn't - // care about the result. - let _ = tx.send(res); - } - ExportMessage::Shutdown => break, - } + process, } } } @@ -160,19 +76,23 @@ pub struct Process { } impl trace::SpanExporter for Exporter { - /// Export spans to Jaeger fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { - let (tx, rx) = oneshot::channel(); - - if let Err(err) = self.tx.try_send(ExportMessage::Export { batch, tx }) { - return Box::pin(futures::future::ready(Err(Into::into(err)))); + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); } - Box::pin(async move { rx.await? }) - } - - fn shutdown(&mut self) { - let _ = self.tx.try_send(ExportMessage::Shutdown); + let uploader = self.uploader.clone(); + Box::pin(async move { + uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await + }) } } diff --git a/opentelemetry-jaeger/src/exporter/uploader.rs b/opentelemetry-jaeger/src/exporter/uploader.rs index fd17aab72f..b789f24eed 100644 --- a/opentelemetry-jaeger/src/exporter/uploader.rs +++ b/opentelemetry-jaeger/src/exporter/uploader.rs @@ -11,22 +11,24 @@ use crate::exporter::thrift::jaeger::Batch; use crate::exporter::JaegerTraceRuntime; #[async_trait] -pub(crate) trait Uploader: std::fmt::Debug + Send { - async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult; +pub(crate) trait Uploader: Debug + Send + Sync { + async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult; } #[derive(Debug)] pub(crate) enum SyncUploader { - Agent(agent::AgentSyncClientUdp), + Agent(std::sync::Mutex), } #[async_trait] impl Uploader for SyncUploader { - async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult { + async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult { match self { SyncUploader::Agent(client) => { // TODO Implement retry behaviour client + .lock() + .expect("Failed to lock agent client") .emit_batch(batch) .map_err::(Into::into)?; } @@ -39,7 +41,7 @@ impl Uploader for SyncUploader { #[derive(Debug)] pub(crate) enum AsyncUploader { /// Agent async client - Agent(agent::AgentAsyncClientUdp), + Agent(futures::lock::Mutex>), /// Collector sync client #[cfg(feature = "collector_client")] Collector(collector::AsyncHttpClient), @@ -49,11 +51,13 @@ pub(crate) enum AsyncUploader { #[async_trait] impl Uploader for AsyncUploader { - async fn upload(&mut self, batch: Batch) -> ExportResult { + async fn upload(&self, batch: Batch) -> ExportResult { match self { Self::Agent(client) => { // TODO Implement retry behaviour client + .lock() + .await .emit_batch(batch) .await .map_err::(Into::into)?;