Skip to content

Commit

Permalink
[WIP] Implement new SpanExporter API for Jaeger
Browse files Browse the repository at this point in the history
Key points
- decouple exporter from uploaders via channel and spawned task
- some uploaders are a shared I/O resource and cannot be multiplexed
    - necessitates a task queue
    - eg, HttpClient will spawn many I/O tasks internally, AgentUploader
      is a single I/O resource. Different level of abstraction.
- Synchronous API not supported without a Runtime argument. I updated
  the API to thread one through, but maybe this is undesirable. I'm also
  exploiting the fact in the Actix examples that it uses Tokio under the
  hood to pass through the Tokio runtime token.
  • Loading branch information
jwilm committed Apr 21, 2022
1 parent cc2175d commit 2d1df86
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 66 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Expand Up @@ -7,7 +7,7 @@ members = [
"opentelemetry-datadog",
"opentelemetry-dynatrace",
"opentelemetry-http",
# "opentelemetry-jaeger",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"opentelemetry-proto",
Expand All @@ -16,24 +16,24 @@ members = [
"opentelemetry-stackdriver",
"opentelemetry-zipkin",
"opentelemetry-zpages",
# "examples/actix-http",
# "examples/actix-http-tracing",
# "examples/actix-udp",
# "examples/async",
"examples/actix-http",
"examples/actix-http-tracing",
"examples/actix-udp",
"examples/async",
"examples/aws-xray",
# "examples/basic",
"examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/basic-otlp-http",
"examples/datadog",
"examples/dynatrace",
"examples/external-otlp-tonic-tokio",
# "examples/grpc",
"examples/grpc",
"examples/http",
"examples/hyper-prometheus",
# "examples/tracing-grpc",
"examples/tracing-grpc",
"examples/zipkin",
# "examples/multiple-span-processors",
"examples/multiple-span-processors",
"examples/zpages"
]
exclude = ["examples/external-otlp-grpcio-async-std"]
8 changes: 5 additions & 3 deletions examples/actix-udp/src/main.rs
@@ -1,14 +1,16 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use opentelemetry_jaeger::JaegerTraceRuntime;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
fn init_tracer<R: JaegerTraceRuntime>(runtime: R) -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
Expand All @@ -19,7 +21,7 @@ fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry::KeyValue::new("exporter", "jaeger"),
]),
))
.install_simple()
.install_simple(runtime)
}

async fn index() -> &'static str {
Expand All @@ -34,7 +36,7 @@ async fn index() -> &'static str {
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let _tracer = init_tracer().expect("Failed to initialise tracer.");
let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer.");

HttpServer::new(|| {
App::new()
Expand Down
8 changes: 5 additions & 3 deletions examples/grpc/src/client.rs
Expand Up @@ -3,13 +3,15 @@ use hello_world::HelloRequest;
use opentelemetry::global;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceResult;
use opentelemetry::{
propagation::Injector,
sdk::trace::Tracer,
trace::{TraceContextExt, Tracer as _},
Context, KeyValue,
};
use opentelemetry_jaeger::JaegerTraceRuntime;

struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);

Expand All @@ -28,16 +30,16 @@ pub mod hello_world {
tonic::include_proto!("helloworld");
}

fn tracing_init() -> TraceResult<Tracer> {
fn tracing_init<R: JaegerTraceRuntime>(runtime: R) -> TraceResult<Tracer> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-client")
.install_simple()
.install_simple(runtime)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = tracing_init()?;
let tracer = tracing_init(Tokio)?;
let mut client = GreeterClient::connect("http://[::1]:50051").await?;
let span = tracer.start("client-request");
let cx = Context::current_with_span(span);
Expand Down
8 changes: 5 additions & 3 deletions examples/grpc/src/server.rs
Expand Up @@ -4,12 +4,14 @@ use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceError;
use opentelemetry::{
propagation::Extractor,
trace::{Span, Tracer},
KeyValue,
};
use opentelemetry_jaeger::JaegerTraceRuntime;
use std::error::Error;

pub mod hello_world {
Expand Down Expand Up @@ -59,16 +61,16 @@ impl Greeter for MyGreeter {
}
}

fn tracing_init() -> Result<impl Tracer, TraceError> {
fn tracing_init<R: JaegerTraceRuntime>(runtime: R) -> Result<impl Tracer, TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-server")
.install_simple()
.install_simple(runtime)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _tracer = tracing_init()?;
let _tracer = tracing_init(Tokio)?;
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();

Expand Down
2 changes: 2 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -22,6 +22,8 @@ rustdoc-args = ["--cfg", "docsrs"]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
Expand Down
57 changes: 33 additions & 24 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Expand Up @@ -228,20 +228,24 @@ impl AgentPipeline {
/// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
///
/// The exporter will send each span to the agent upon the span ends.
pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
pub fn build_simple<R: JaegerTraceRuntime>(
mut self,
runtime: R,
) -> Result<TracerProvider, TraceError> {
let mut builder = sdk::trace::TracerProvider::builder();

let (config, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let exporter = Exporter::new(
let (exporter, task) = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
);

runtime.spawn(Box::pin(task));
builder = builder.with_simple_exporter(exporter);
builder = builder.with_config(config);

Expand Down Expand Up @@ -275,7 +279,9 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand All @@ -287,8 +293,11 @@ impl AgentPipeline {
/// tracer provider.
///
/// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple()?;
pub fn install_simple<R: JaegerTraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple(runtime)?;
install_tracer_provider_and_get_tracer(tracer_provider)
}

Expand Down Expand Up @@ -321,28 +330,27 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
process.into(),
export_instrument_library,
uploader,
))
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
Ok(exporter)
}

/// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
let builder = sdk::trace::TracerProvider::builder();
let (_, process) = build_config_and_process(
builder.sdk_provided_resource(),
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
Ok(Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
))
}
// pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
// let builder = sdk::trace::TracerProvider::builder();
// let (_, process) = build_config_and_process(
// builder.sdk_provided_resource(),
// self.trace_config.take(),
// self.transformation_config.service_name.take(),
// );
// Ok(Exporter::new(
// process.into(),
// self.transformation_config.export_instrument_library,
// self.build_sync_agent_uploader()?,
// ))
// }

fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Box<dyn Uploader>, TraceError>
where
Expand All @@ -358,6 +366,7 @@ impl AgentPipeline {
Ok(Box::new(AsyncUploader::Agent(agent)))
}

#[allow(dead_code)] // TODO jwilm
fn build_sync_agent_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Expand Up @@ -412,8 +412,9 @@ impl CollectorPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_uploader::<R>()?;
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Expand Up @@ -153,6 +153,7 @@ mod tests {
// OTEL_SERVICE_NAME env var also works
env::set_var("OTEL_SERVICE_NAME", "test service");
let builder = new_agent_pipeline();
// TODO jwilm
let exporter = builder.build_sync_agent_exporter().unwrap();
assert_eq!(exporter.process.service_name, "test service");
env::set_var("OTEL_SERVICE_NAME", "")
Expand Down
79 changes: 56 additions & 23 deletions opentelemetry-jaeger/src/exporter/mod.rs
Expand Up @@ -18,8 +18,11 @@ use std::convert::TryFrom;

use self::runtime::JaegerTraceRuntime;
use self::thrift::jaeger;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;
use std::future::Future;

#[cfg(feature = "isahc_collector_client")]
#[allow(unused_imports)] // this is actually used to configure authentication
Expand All @@ -45,22 +48,59 @@ const INSTRUMENTATION_LIBRARY_VERSION: &str = "otel.library.version";
/// Jaeger span exporter
#[derive(Debug)]
pub struct Exporter {
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
tx: mpsc::Sender<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
}

impl Exporter {
fn new(
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> Exporter {
Exporter {
process,
export_instrumentation_lib,
uploader,
) -> (Exporter, impl Future<Output = ()>) {
let (tx, rx) = futures::channel::mpsc::channel(64);
(
Exporter { tx },
ExporterTask {
rx,
process,
export_instrumentation_lib,
uploader,
}
.run(),
)
}
}

struct ExporterTask {
rx: mpsc::Receiver<(Vec<trace::SpanData>, oneshot::Sender<trace::ExportResult>)>,
process: jaeger::Process,
/// Whether or not to export instrumentation information.
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
}

impl ExporterTask {
async fn run(mut self) {
// TODO jwilm: this might benefit from a ExporterMessage so that we can
// send Shutdown and break the loop.
while let Some((batch, tx)) = self.rx.next().await {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
}

let res = self
.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await;

// TODO jwilm: is ignoring the err (fail to send) correct here?
let _ = tx.send(res);
}
}
}
Expand All @@ -74,23 +114,16 @@ pub struct Process {
pub tags: Vec<KeyValue>,
}

#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let process = self.process.clone();

for span in batch.into_iter() {
jaeger_spans.push(convert_otel_span_into_jaeger_span(
span,
self.export_instrumentation_lib,
));
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
let (tx, rx) = oneshot::channel();

if let Err(err) = self.tx.try_send((batch, tx)) {
return Box::pin(futures::future::ready(Err(Into::into(err))));
}

self.uploader
.upload(jaeger::Batch::new(process, jaeger_spans))
.await
Box::pin(async move { rx.await? })
}
}

Expand Down

0 comments on commit 2d1df86

Please sign in to comment.