From 2d1df86e9b6ce3e048b24c6bb88bfd118146f2d2 Mon Sep 17 00:00:00 2001 From: Joe Wilm Date: Thu, 21 Apr 2022 10:49:30 -0700 Subject: [PATCH] [WIP] Implement new SpanExporter API for Jaeger Key points - decouple exporter from uploaders via channel and spawned task - some uploaders are a shared I/O resource and cannot be multiplexed - necessitates a task queue - eg, HttpClient will spawn many I/O tasks internally, AgentUploader is a single I/O resource. Different level of abstraction. - Synchronous API not supported without a Runtime argument. I updated the API to thread one through, but maybe this is undesirable. I'm also exploiting the fact in the Actix examples that it uses Tokio under the hood to pass through the Tokio runtime token. --- Cargo.toml | 18 ++--- examples/actix-udp/src/main.rs | 8 +- examples/grpc/src/client.rs | 8 +- examples/grpc/src/server.rs | 8 +- opentelemetry-jaeger/Cargo.toml | 2 + .../src/exporter/config/agent.rs | 57 +++++++------ .../src/exporter/config/collector/mod.rs | 3 +- .../src/exporter/config/mod.rs | 1 + opentelemetry-jaeger/src/exporter/mod.rs | 79 +++++++++++++------ 9 files changed, 118 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a595369927..b7eaef93f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ "opentelemetry-datadog", "opentelemetry-dynatrace", "opentelemetry-http", -# "opentelemetry-jaeger", + "opentelemetry-jaeger", "opentelemetry-otlp", "opentelemetry-prometheus", "opentelemetry-proto", @@ -16,24 +16,24 @@ members = [ "opentelemetry-stackdriver", "opentelemetry-zipkin", "opentelemetry-zpages", -# "examples/actix-http", -# "examples/actix-http-tracing", -# "examples/actix-udp", -# "examples/async", + "examples/actix-http", + "examples/actix-http-tracing", + "examples/actix-udp", + "examples/async", "examples/aws-xray", -# "examples/basic", + "examples/basic", "examples/basic-otlp", "examples/basic-otlp-with-selector", "examples/basic-otlp-http", "examples/datadog", "examples/dynatrace", "examples/external-otlp-tonic-tokio", -# "examples/grpc", + "examples/grpc", "examples/http", "examples/hyper-prometheus", -# "examples/tracing-grpc", + "examples/tracing-grpc", "examples/zipkin", -# "examples/multiple-span-processors", + "examples/multiple-span-processors", "examples/zpages" ] exclude = ["examples/external-otlp-grpcio-async-std"] diff --git a/examples/actix-udp/src/main.rs b/examples/actix-udp/src/main.rs index 2a741b92a1..cac9801fb0 100644 --- a/examples/actix-udp/src/main.rs +++ b/examples/actix-udp/src/main.rs @@ -1,14 +1,16 @@ use actix_service::Service; use actix_web::middleware::Logger; use actix_web::{web, App, HttpServer}; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk::trace as sdktrace}; use opentelemetry::{ trace::{FutureExt, TraceContextExt, Tracer}, Key, }; +use opentelemetry_jaeger::JaegerTraceRuntime; -fn init_tracer() -> Result { +fn init_tracer(runtime: R) -> Result { opentelemetry_jaeger::new_agent_pipeline() .with_endpoint("localhost:6831") .with_service_name("trace-udp-demo") @@ -19,7 +21,7 @@ fn init_tracer() -> Result { opentelemetry::KeyValue::new("exporter", "jaeger"), ]), )) - .install_simple() + .install_simple(runtime) } async fn index() -> &'static str { @@ -34,7 +36,7 @@ async fn index() -> &'static str { async fn main() -> std::io::Result<()> { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); - let _tracer = init_tracer().expect("Failed to initialise tracer."); + let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer."); HttpServer::new(|| { App::new() diff --git a/examples/grpc/src/client.rs b/examples/grpc/src/client.rs index df2e65ab14..51668cd9b8 100644 --- a/examples/grpc/src/client.rs +++ b/examples/grpc/src/client.rs @@ -3,6 +3,7 @@ use hello_world::HelloRequest; use opentelemetry::global; use opentelemetry::global::shutdown_tracer_provider; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceResult; use opentelemetry::{ propagation::Injector, @@ -10,6 +11,7 @@ use opentelemetry::{ trace::{TraceContextExt, Tracer as _}, Context, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap); @@ -28,16 +30,16 @@ pub mod hello_world { tonic::include_proto!("helloworld"); } -fn tracing_init() -> TraceResult { +fn tracing_init(runtime: R) -> TraceResult { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-client") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let tracer = tracing_init()?; + let tracer = tracing_init(Tokio)?; let mut client = GreeterClient::connect("http://[::1]:50051").await?; let span = tracer.start("client-request"); let cx = Context::current_with_span(span); diff --git a/examples/grpc/src/server.rs b/examples/grpc/src/server.rs index b4f02b5e2e..a454e5f7d8 100644 --- a/examples/grpc/src/server.rs +++ b/examples/grpc/src/server.rs @@ -4,12 +4,14 @@ use hello_world::greeter_server::{Greeter, GreeterServer}; use hello_world::{HelloReply, HelloRequest}; use opentelemetry::global; use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry::sdk::runtime::Tokio; use opentelemetry::trace::TraceError; use opentelemetry::{ propagation::Extractor, trace::{Span, Tracer}, KeyValue, }; +use opentelemetry_jaeger::JaegerTraceRuntime; use std::error::Error; pub mod hello_world { @@ -59,16 +61,16 @@ impl Greeter for MyGreeter { } } -fn tracing_init() -> Result { +fn tracing_init(runtime: R) -> Result { global::set_text_map_propagator(TraceContextPropagator::new()); opentelemetry_jaeger::new_agent_pipeline() .with_service_name("grpc-server") - .install_simple() + .install_simple(runtime) } #[tokio::main] async fn main() -> Result<(), Box> { - let _tracer = tracing_init()?; + let _tracer = tracing_init(Tokio)?; let addr = "[::1]:50051".parse()?; let greeter = MyGreeter::default(); diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index f020cb87ca..f37308ac35 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"] async-std = { version = "1.6", optional = true } async-trait = "0.1" base64 = { version = "0.13", optional = true } +futures = "0.3" +futures-channel = "0.3" futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true } headers = { version = "0.3.2", optional = true } http = { version = "0.2", optional = true } diff --git a/opentelemetry-jaeger/src/exporter/config/agent.rs b/opentelemetry-jaeger/src/exporter/config/agent.rs index efefc17a9b..03c81803c7 100644 --- a/opentelemetry-jaeger/src/exporter/config/agent.rs +++ b/opentelemetry-jaeger/src/exporter/config/agent.rs @@ -228,7 +228,10 @@ impl AgentPipeline { /// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline. /// /// The exporter will send each span to the agent upon the span ends. - pub fn build_simple(mut self) -> Result { + pub fn build_simple( + mut self, + runtime: R, + ) -> Result { let mut builder = sdk::trace::TracerProvider::builder(); let (config, process) = build_config_and_process( @@ -236,12 +239,13 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let exporter = Exporter::new( + let (exporter, task) = Exporter::new( process.into(), self.transformation_config.export_instrument_library, self.build_sync_agent_uploader()?, ); + runtime.spawn(Box::pin(task)); builder = builder.with_simple_exporter(exporter); builder = builder.with_config(config); @@ -275,7 +279,9 @@ impl AgentPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_async_agent_uploader(runtime.clone())?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); @@ -287,8 +293,11 @@ impl AgentPipeline { /// tracer provider. /// /// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate. - pub fn install_simple(self) -> Result { - let tracer_provider = self.build_simple()?; + pub fn install_simple( + self, + runtime: R, + ) -> Result { + let tracer_provider = self.build_simple(runtime)?; install_tracer_provider_and_get_tracer(tracer_provider) } @@ -321,28 +330,27 @@ impl AgentPipeline { self.trace_config.take(), self.transformation_config.service_name.take(), ); - let uploader = self.build_async_agent_uploader(runtime)?; - Ok(Exporter::new( - process.into(), - export_instrument_library, - uploader, - )) + let uploader = self.build_async_agent_uploader(runtime.clone())?; + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + + runtime.spawn(Box::pin(task)); + Ok(exporter) } /// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime. - pub fn build_sync_agent_exporter(mut self) -> Result { - let builder = sdk::trace::TracerProvider::builder(); - let (_, process) = build_config_and_process( - builder.sdk_provided_resource(), - self.trace_config.take(), - self.transformation_config.service_name.take(), - ); - Ok(Exporter::new( - process.into(), - self.transformation_config.export_instrument_library, - self.build_sync_agent_uploader()?, - )) - } + // pub fn build_sync_agent_exporter(mut self) -> Result { + // let builder = sdk::trace::TracerProvider::builder(); + // let (_, process) = build_config_and_process( + // builder.sdk_provided_resource(), + // self.trace_config.take(), + // self.transformation_config.service_name.take(), + // ); + // Ok(Exporter::new( + // process.into(), + // self.transformation_config.export_instrument_library, + // self.build_sync_agent_uploader()?, + // )) + // } fn build_async_agent_uploader(self, runtime: R) -> Result, TraceError> where @@ -358,6 +366,7 @@ impl AgentPipeline { Ok(Box::new(AsyncUploader::Agent(agent))) } + #[allow(dead_code)] // TODO jwilm fn build_sync_agent_uploader(self) -> Result, TraceError> { let agent = AgentSyncClientUdp::new( self.agent_endpoint?.as_slice(), diff --git a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs index 205c142695..df9ff23891 100644 --- a/opentelemetry-jaeger/src/exporter/config/collector/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/collector/mod.rs @@ -412,8 +412,9 @@ impl CollectorPipeline { self.transformation_config.service_name.take(), ); let uploader = self.build_uploader::()?; - let exporter = Exporter::new(process.into(), export_instrument_library, uploader); + let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader); + runtime.spawn(Box::pin(task)); builder = builder.with_batch_exporter(exporter, runtime); builder = builder.with_config(config); diff --git a/opentelemetry-jaeger/src/exporter/config/mod.rs b/opentelemetry-jaeger/src/exporter/config/mod.rs index a363c0270a..039dcc1a07 100644 --- a/opentelemetry-jaeger/src/exporter/config/mod.rs +++ b/opentelemetry-jaeger/src/exporter/config/mod.rs @@ -153,6 +153,7 @@ mod tests { // OTEL_SERVICE_NAME env var also works env::set_var("OTEL_SERVICE_NAME", "test service"); let builder = new_agent_pipeline(); + // TODO jwilm let exporter = builder.build_sync_agent_exporter().unwrap(); assert_eq!(exporter.process.service_name, "test service"); env::set_var("OTEL_SERVICE_NAME", "") diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 0d64a739ea..5500505df6 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -18,8 +18,11 @@ use std::convert::TryFrom; use self::runtime::JaegerTraceRuntime; use self::thrift::jaeger; -use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; +use futures::future::BoxFuture; +use futures::StreamExt; use std::convert::TryInto; +use std::future::Future; #[cfg(feature = "isahc_collector_client")] #[allow(unused_imports)] // this is actually used to configure authentication @@ -45,10 +48,7 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version"; /// Jaeger span exporter #[derive(Debug)] pub struct Exporter { - process: jaeger::Process, - /// Whether or not to export instrumentation information. - export_instrumentation_lib: bool, - uploader: Box, + tx: mpsc::Sender<(Vec, oneshot::Sender)>, } impl Exporter { @@ -56,11 +56,51 @@ impl Exporter { process: jaeger::Process, export_instrumentation_lib: bool, uploader: Box, - ) -> Exporter { - Exporter { - process, - export_instrumentation_lib, - uploader, + ) -> (Exporter, impl Future) { + let (tx, rx) = futures::channel::mpsc::channel(64); + ( + Exporter { tx }, + ExporterTask { + rx, + process, + export_instrumentation_lib, + uploader, + } + .run(), + ) + } +} + +struct ExporterTask { + rx: mpsc::Receiver<(Vec, oneshot::Sender)>, + process: jaeger::Process, + /// Whether or not to export instrumentation information. + export_instrumentation_lib: bool, + uploader: Box, +} + +impl ExporterTask { + async fn run(mut self) { + // TODO jwilm: this might benefit from a ExporterMessage so that we can + // send Shutdown and break the loop. + while let Some((batch, tx)) = self.rx.next().await { + let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); + let process = self.process.clone(); + + for span in batch.into_iter() { + jaeger_spans.push(convert_otel_span_into_jaeger_span( + span, + self.export_instrumentation_lib, + )); + } + + let res = self + .uploader + .upload(jaeger::Batch::new(process, jaeger_spans)) + .await; + + // TODO jwilm: is ignoring the err (fail to send) correct here? + let _ = tx.send(res); } } } @@ -74,23 +114,16 @@ pub struct Process { pub tags: Vec, } -#[async_trait] impl trace::SpanExporter for Exporter { /// Export spans to Jaeger - async fn export(&mut self, batch: Vec) -> trace::ExportResult { - let mut jaeger_spans: Vec = Vec::with_capacity(batch.len()); - let process = self.process.clone(); - - for span in batch.into_iter() { - jaeger_spans.push(convert_otel_span_into_jaeger_span( - span, - self.export_instrumentation_lib, - )); + fn export(&mut self, batch: Vec) -> BoxFuture<'static, trace::ExportResult> { + let (tx, rx) = oneshot::channel(); + + if let Err(err) = self.tx.try_send((batch, tx)) { + return Box::pin(futures::future::ready(Err(Into::into(err)))); } - self.uploader - .upload(jaeger::Batch::new(process, jaeger_spans)) - .await + Box::pin(async move { rx.await? }) } }