From 0617ca897cc7a220345553a10dabd39accb84bc9 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 21 Apr 2022 10:49:30 -0700 Subject: [PATCH] [WIP] Implement new SpanExporter API for Jaeger Key points - decouple exporter from uploaders via channel and spawned task - some uploaders are a shared I/O resource and cannot be multiplexed - necessitates a task queue - eg, HttpClient will spawn many I/O tasks internally, AgentUploader is a single I/O resource. Different level of abstraction. - Synchronous API not supported without a Runtime argument. I updated the API to thread one through, but maybe this is undesirable. I'm also exploiting the fact in the Actix examples that it uses Tokio under the hood to pass through the Tokio runtime token. - Tests pass save for a couple of flakey environment ones which is likely a race condition. --- Cargo.toml | 18 ++-- examples/actix-udp/src/main.rs | 8 +- examples/grpc/src/client.rs | 8 +- examples/grpc/src/server.rs | 8 +- opentelemetry-jaeger/Cargo.toml | 2 + .../src/exporter/config/agent.rs | 43 ++++++---- .../src/exporter/config/collector/mod.rs | 3 +- .../src/exporter/config/mod.rs | 8 +- opentelemetry-jaeger/src/exporter/mod.rs | 84 ++++++++++++++----- opentelemetry-jaeger/src/lib.rs | 6 +- 10 files changed, 128 insertions(+), 60 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a595369927..b7eaef93f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "opentelemetry-datadog", "opentelemetry-dynatrace", "opentelemetry-http", -# "opentelemetry-jaeger", + "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry-proto", @@ -16,24 +16,24 @@ members = [ "opentelemetry-stackdriver", "opentelemetry-zipkin", "opentelemetry-zpages", -# "examples/actix-http", -# "examples/actix-http-tracing", -# "examples/actix-udp", -# "examples/async", + "examples/actix-http", + "examples/actix-http-tracing", + "examples/actix-udp", + "examples/async", "examples/aws-xray", -# "examples/basic", + "examples/basic", "examples/basic-otlp", "examples/basic-otlp-with-selector", "examples/basic-otlp-http", "examples/datadog", "examples/dynatrace", "examples/external-otlp-tonic-tokio", -# "examples/grpc", + "examples/grpc", "examples/http", "examples/hyper-prometheus", -# "examples/tracing-grpc", + "examples/tracing-grpc", "examples/zipkin", -# "examples/multiple-span-processors", + "examples/multiple-span-processors", "examples/zpages" ] exclude = ["examples/external-otlp-grpcio-async-std"] diff --git a/examples/actix-udp/src/main.rs b/examples/actix-udp/src/main.rs index 2a741b92a1..cac9801fb0 100644 --- a/examples/actix-udp/src/main.rs +++ b/examples/actix-udp/src/main.rs @@ -1,14 +1,16 @@ use actix_service::Service; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk::trace as sdktrace}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, Key, }; +use opentelemetry_jaeger::JaegerTraceRuntime; -fn init_tracer() -> Result { +fn init_tracer(runtime: R) -> Result { opentelemetry_jaeger::new_agent_pipeline() .with_endpoint("localhost:6831") .with_service_name("trace-udp-demo") @@ -19,7 +21,7 @@ fn init_tracer() -> Result { opentelemetry::KeyValue::new("exporter", "jaeger"), ]), )) - .install_simple() + .install_simple(runtime) } async fn index() -> &'static str { @@ -34,7 +36,7 @@ async fn index() -> &'static str { async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); - let _tracer = init_tracer().expect("Failed to initialise tracer."); + let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer."); HttpServer::new(|| { App::new() diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index df2e65ab14..51668cd9b8 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -3,6 +3,7 @@ use hello_world::HelloRequest; use opentelemetry::global; use opentelemetry::global::shutdown_tracer_provider; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceResult; use opentelemetry::{ propagation::Injector, @@ -10,6 +11,7 @@ use opentelemetry::{ trace::{TraceContextExt, Tracer as _}, Context, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); @@ -28,16 +30,16 @@ pub mod hello_world { tonic::include_proto!("helloworld"); } -fn tracing_init() -> TraceResult { +fn tracing_init(runtime: R) -> TraceResult { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-client") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let tracer = tracing_init()?; + let tracer = tracing_init(Tokio)?; let mut client = GreeterClient::connect("http://[::1]:50051").await?; let span = tracer.start("client-request"); let cx = Context::current_with_span(span); diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index b4f02b5e2e..a454e5f7d8 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -4,12 +4,14 @@ use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; use opentelemetry::global; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{ propagation::Extractor, trace::{Span, Tracer}, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; use std::error::Error; pub mod hello_world { @@ -59,16 +61,16 @@ impl Greeter for MyGreeter { } } -fn tracing_init() -> Result { +fn tracing_init(runtime: R) -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-server") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let _tracer = tracing_init()?; + let _tracer = tracing_init(Tokio)?; let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index f020cb87ca..f37308ac35 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"] async-std = { version = "1.6", optional = true } async-trait = "0.1" base64 = { version = "0.13", optional = true } +futures = "0.3" +futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index efefc17a9b..bd4dfeb4a6 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -228,7 +228,10 @@ impl AgentPipeline { /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. - pub fn build_simple(mut self) -> Result { + pub fn build_simple( + mut self, + runtime: R, + ) -> Result { let mut builder = sdk::trace::TracerProvider::builder(); let (config, process) = build_config_and_process( @@ -236,12 +239,13 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let exporter = Exporter::new( + let (exporter, task) = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, ); + runtime.spawn(Box::pin(task)); builder = builder.with_simple_exporter(exporter); builder = builder.with_config(config); @@ -275,7 +279,9 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -287,8 +293,11 @@ impl AgentPipeline { /// tracer provider. /// /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. - pub fn install_simple(self) -> Result { - let tracer_provider = self.build_simple()?; + pub fn install_simple( + self, + runtime: R, + ) -> Result { + let tracer_provider = self.build_simple(runtime)?; install_tracer_provider_and_get_tracer(tracer_provider) } @@ -321,27 +330,32 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime)?; - Ok(Exporter::new( - process.into(), - export_instrument_library, - uploader, - )) + let uploader = self.build_async_agent_uploader(runtime.clone())?; + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. - pub fn build_sync_agent_exporter(mut self) -> Result { + pub fn build_sync_agent_exporter( + mut self, + runtime: R, + ) -> Result { let builder = sdk::trace::TracerProvider::builder(); let (_, process) = build_config_and_process( builder.sdk_provided_resource(), self.trace_config.take(), self.transformation_config.service_name.take(), ); - Ok(Exporter::new( + let (exporter, task) = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, - )) + ); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> @@ -358,6 +372,7 @@ impl AgentPipeline { Ok(Box::new(AsyncUploader::Agent(agent))) } + #[allow(dead_code)] // TODO jwilm fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 205c142695..df9ff23891 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -412,8 +412,9 @@ impl CollectorPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_uploader::()?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index a363c0270a..a2621ee278 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -148,12 +148,14 @@ mod tests { ); } - #[test] - fn test_read_from_env() { + #[tokio::test] + async fn test_read_from_env() { // OTEL_SERVICE_NAME env var also works env::set_var("OTEL_SERVICE_NAME", "test service"); let builder = new_agent_pipeline(); - let exporter = builder.build_sync_agent_exporter().unwrap(); + let exporter = builder + .build_sync_agent_exporter(opentelemetry::runtime::Tokio) + .unwrap(); assert_eq!(exporter.process.service_name, "test service"); env::set_var("OTEL_SERVICE_NAME", "") } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 0d64a739ea..458c9b1790 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -18,8 +18,11 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; +use futures::StreamExt; use std::convert::TryInto; +use std::future::Future; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -45,10 +48,11 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { + tx: mpsc::Sender<(Vec, oneshot::Sender)>, + // TODO jwilm: this is used in an existing test to check that the service + // name is properly read from the environment. + #[allow(dead_code)] process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: Box, } impl Exporter { @@ -56,11 +60,54 @@ impl Exporter { process: jaeger::Process, export_instrumentation_lib: bool, uploader: Box, - ) -> Exporter { - Exporter { - process, - export_instrumentation_lib, - uploader, + ) -> (Exporter, impl Future) { + let (tx, rx) = futures::channel::mpsc::channel(64); + ( + Exporter { + tx, + process: process.clone(), + }, + ExporterTask { + rx, + export_instrumentation_lib, + uploader, + process, + } + .run(), + ) + } +} + +struct ExporterTask { + rx: mpsc::Receiver<(Vec, oneshot::Sender)>, + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: Box, +} + +impl ExporterTask { + async fn run(mut self) { + // TODO jwilm: this might benefit from a ExporterMessage so that we can + // send Shutdown and break the loop. + while let Some((batch, tx)) = self.rx.next().await { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + let res = self + .uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await; + + // TODO jwilm: is ignoring the err (fail to send) correct here? + let _ = tx.send(res); } } } @@ -74,23 +121,16 @@ pub struct Process { pub tags: Vec, } -#[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + let (tx, rx) = oneshot::channel(); + + if let Err(err) = self.tx.try_send((batch, tx)) { + return Box::pin(futures::future::ready(Err(Into::into(err)))); } - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await + Box::pin(async move { rx.await? }) } } diff --git a/opentelemetry-jaeger/src/lib.rs b/opentelemetry-jaeger/src/lib.rs index c921e0672f..e1ffecb902 100644 --- a/opentelemetry-jaeger/src/lib.rs +++ b/opentelemetry-jaeger/src/lib.rs @@ -24,10 +24,12 @@ //! ```no_run //! use opentelemetry::trace::Tracer; //! use opentelemetry::global; +//! use opentelemetry::runtime::Tokio; //! -//! fn main() -> Result<(), opentelemetry::trace::TraceError> { +//! #[tokio::main] +//! async fn main() -> Result<(), opentelemetry::trace::TraceError> { //! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); -//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?; +//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple(Tokio)?; //! //! tracer.in_span("doing_work", |cx| { //! // Traced app logic here...