Skip to content

Commit

Permalink
Remove runtime from Jaeger's install_simple
Browse files Browse the repository at this point in the history
To keep the API _actually_ simple, we now leverage a thread to run the
jaeger exporter internals.
  • Loading branch information
jwilm committed May 5, 2022
1 parent 6a3d744 commit 4eb89a4
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 66 deletions.
8 changes: 3 additions & 5 deletions examples/actix-udp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use opentelemetry_jaeger::JaegerTraceRuntime;

fn init_tracer<R: JaegerTraceRuntime>(runtime: R) -> Result<sdktrace::Tracer, TraceError> {
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
Expand All @@ -21,7 +19,7 @@ fn init_tracer<R: JaegerTraceRuntime>(runtime: R) -> Result<sdktrace::Tracer, Tr
opentelemetry::KeyValue::new("exporter", "jaeger"),
]),
))
.install_simple(runtime)
.install_simple()
}

async fn index() -> &'static str {
Expand All @@ -36,7 +34,7 @@ async fn index() -> &'static str {
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();
let _tracer = init_tracer(Tokio).expect("Failed to initialise tracer.");
let _tracer = init_tracer().expect("Failed to initialise tracer.");

HttpServer::new(|| {
App::new()
Expand Down
8 changes: 3 additions & 5 deletions examples/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ use hello_world::HelloRequest;
use opentelemetry::global;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceResult;
use opentelemetry::{
propagation::Injector,
sdk::trace::Tracer,
trace::{TraceContextExt, Tracer as _},
Context, KeyValue,
};
use opentelemetry_jaeger::JaegerTraceRuntime;

struct MetadataMap<'a>(&'a mut tonic::metadata::MetadataMap);

Expand All @@ -30,16 +28,16 @@ pub mod hello_world {
tonic::include_proto!("helloworld");
}

fn tracing_init<R: JaegerTraceRuntime>(runtime: R) -> TraceResult<Tracer> {
fn tracing_init() -> TraceResult<Tracer> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-client")
.install_simple(runtime)
.install_simple()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = tracing_init(Tokio)?;
let tracer = tracing_init()?;
let mut client = GreeterClient::connect("http://[::1]:50051").await?;
let span = tracer.start("client-request");
let cx = Context::current_with_span(span);
Expand Down
8 changes: 3 additions & 5 deletions examples/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::runtime::Tokio;
use opentelemetry::trace::TraceError;
use opentelemetry::{
propagation::Extractor,
trace::{Span, Tracer},
KeyValue,
};
use opentelemetry_jaeger::JaegerTraceRuntime;
use std::error::Error;

pub mod hello_world {
Expand Down Expand Up @@ -61,16 +59,16 @@ impl Greeter for MyGreeter {
}
}

fn tracing_init<R: JaegerTraceRuntime>(runtime: R) -> Result<impl Tracer, TraceError> {
fn tracing_init() -> Result<impl Tracer, TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-server")
.install_simple(runtime)
.install_simple()
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _tracer = tracing_init(Tokio)?;
let _tracer = tracing_init()?;
let addr = "[::1]:50051".parse()?;
let greeter = MyGreeter::default();

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std"], optional = true }
futures-executor = "0.3"
headers = { version = "0.3.2", optional = true }
http = { version = "0.2", optional = true }
isahc = { version = "1.4", default-features = false, optional = true }
Expand Down
43 changes: 14 additions & 29 deletions opentelemetry-jaeger/src/exporter/config/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,23 +228,19 @@ impl AgentPipeline {
/// Build a `TracerProvider` using a blocking exporter and configurations from the pipeline.
///
/// The exporter will send each span to the agent upon the span ends.
pub fn build_simple<R: JaegerTraceRuntime>(
mut self,
runtime: R,
) -> Result<TracerProvider, TraceError> {
pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
let mut builder = sdk::trace::TracerProvider::builder();

let (config, process) = build_config_and_process(
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let (exporter, task) = Exporter::new(
let exporter = Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
);

runtime.spawn(Box::pin(task));
builder = builder.with_simple_exporter(exporter);
builder = builder.with_config(config);

Expand Down Expand Up @@ -277,9 +273,7 @@ impl AgentPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);

builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);
Expand All @@ -291,11 +285,8 @@ impl AgentPipeline {
/// tracer provider.
///
/// The tracer name is `opentelemetry-jaeger`. The tracer version will be the version of this crate.
pub fn install_simple<R: JaegerTraceRuntime>(
self,
runtime: R,
) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple(runtime)?;
pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
let tracer_provider = self.build_simple()?;
install_tracer_provider_and_get_tracer(tracer_provider)
}

Expand Down Expand Up @@ -326,30 +317,25 @@ impl AgentPipeline {
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let uploader = self.build_async_agent_uploader(runtime.clone())?;
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
Ok(exporter)
let uploader = self.build_async_agent_uploader(runtime)?;
Ok(Exporter::new(
process.into(),
export_instrument_library,
uploader,
))
}

/// Build an jaeger exporter targeting a jaeger agent and running on the sync runtime.
pub fn build_sync_agent_exporter<R: JaegerTraceRuntime>(
mut self,
runtime: R,
) -> Result<crate::Exporter, TraceError> {
pub fn build_sync_agent_exporter(mut self) -> Result<crate::Exporter, TraceError> {
let (_, process) = build_config_and_process(
self.trace_config.take(),
self.transformation_config.service_name.take(),
);
let (exporter, task) = Exporter::new(
Ok(Exporter::new(
process.into(),
self.transformation_config.export_instrument_library,
self.build_sync_agent_uploader()?,
);

runtime.spawn(Box::pin(task));
Ok(exporter)
))
}

fn build_async_agent_uploader<R>(self, runtime: R) -> Result<Box<dyn Uploader>, TraceError>
Expand All @@ -366,7 +352,6 @@ impl AgentPipeline {
Ok(Box::new(AsyncUploader::Agent(agent)))
}

#[allow(dead_code)] // TODO jwilm
fn build_sync_agent_uploader(self) -> Result<Box<dyn Uploader>, TraceError> {
let agent = AgentSyncClientUdp::new(
self.agent_endpoint?.as_slice(),
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-jaeger/src/exporter/config/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,8 @@ impl CollectorPipeline {
self.transformation_config.service_name.take(),
);
let uploader = self.build_uploader::<R>()?;
let (exporter, task) = Exporter::new(process.into(), export_instrument_library, uploader);
let exporter = Exporter::new(process.into(), export_instrument_library, uploader);

runtime.spawn(Box::pin(task));
builder = builder.with_batch_exporter(exporter, runtime);
builder = builder.with_config(config);

Expand Down
4 changes: 1 addition & 3 deletions opentelemetry-jaeger/src/exporter/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ mod tests {
// OTEL_SERVICE_NAME env var also works
env::set_var("OTEL_SERVICE_NAME", "test service");
let builder = new_agent_pipeline();
let exporter = builder
.build_sync_agent_exporter(opentelemetry::runtime::Tokio)
.unwrap();
let exporter = builder.build_sync_agent_exporter().unwrap();
assert_eq!(exporter.process.service_name, "test service");
env::set_var("OTEL_SERVICE_NAME", "")
}
Expand Down
28 changes: 13 additions & 15 deletions opentelemetry-jaeger/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use futures::channel::{mpsc, oneshot};
use futures::future::BoxFuture;
use futures::StreamExt;
use std::convert::TryInto;
use std::future::Future;

#[cfg(feature = "isahc_collector_client")]
#[allow(unused_imports)] // this is actually used to configure authentication
Expand Down Expand Up @@ -60,21 +59,20 @@ impl Exporter {
process: jaeger::Process,
export_instrumentation_lib: bool,
uploader: Box<dyn Uploader>,
) -> (Exporter, impl Future<Output = ()>) {
) -> Exporter {
let (tx, rx) = futures::channel::mpsc::channel(64);
(
Exporter {
tx,
process: process.clone(),
},
ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process,
}
.run(),
)

let exporter_task = ExporterTask {
rx,
export_instrumentation_lib,
uploader,
process: process.clone(),
};
std::thread::spawn(move || {
futures_executor::block_on(exporter_task.run());
});

Exporter { tx, process }
}
}

Expand Down
3 changes: 1 addition & 2 deletions opentelemetry-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
//! ```no_run
//! use opentelemetry::trace::Tracer;
//! use opentelemetry::global;
//! use opentelemetry::runtime::Tokio;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple(Tokio)?;
//! let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;
//!
//! tracer.in_span("doing_work", |cx| {
//! // Traced app logic here...
Expand Down

0 comments on commit 4eb89a4

Please sign in to comment.