Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit error handling in trace #371

Merged
merged 19 commits into from Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 2 additions & 5 deletions examples/actix-http/src/main.rs
@@ -1,17 +1,14 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use std::error::Error;

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
.with_service_name("trace-http-demo")
Expand Down
7 changes: 2 additions & 5 deletions examples/actix-udp/src/main.rs
@@ -1,17 +1,14 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use std::error::Error;

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
Expand Down
6 changes: 2 additions & 4 deletions examples/async/src/main.rs
Expand Up @@ -17,6 +17,7 @@
//! cargo run --example async_fn
//!
//! [`hello_world`]: https://github.com/tokio-rs/tokio/blob/132e9f1da5965530b63554d7a1c59824c3de4e30/tokio/examples/hello_world.rs
use opentelemetry::trace::TraceError;
use opentelemetry::{
global,
sdk::trace as sdktrace,
Expand Down Expand Up @@ -52,10 +53,7 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
write(&mut stream).with_context(cx).await
}

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_service_name("trace-demo")
.install()
Expand Down
5 changes: 2 additions & 3 deletions examples/basic-otlp/src/main.rs
@@ -1,6 +1,7 @@
use futures::stream::{Stream, StreamExt};
use opentelemetry::exporter;
use opentelemetry::sdk::metrics::PushController;
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, MetricsError, ObserverResult},
Expand All @@ -11,9 +12,7 @@ use opentelemetry::{global, sdk::trace as sdktrace};
use std::error::Error;
use std::time::Duration;

fn init_tracer(
) -> Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), TraceError> {
opentelemetry_otlp::new_pipeline().install()
}

Expand Down
6 changes: 2 additions & 4 deletions examples/basic/src/main.rs
Expand Up @@ -2,6 +2,7 @@ use futures::stream::{Stream, StreamExt};
use opentelemetry::exporter;
use opentelemetry::global;
use opentelemetry::sdk::{metrics::PushController, trace as sdktrace};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, MetricsError, ObserverResult},
Expand All @@ -11,10 +12,7 @@ use opentelemetry::{
use std::error::Error;
use std::time::Duration;

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_service_name("trace-demo")
.with_tags(vec![
Expand Down
6 changes: 2 additions & 4 deletions examples/grpc/src/client.rs
Expand Up @@ -2,19 +2,17 @@ use hello_world::greeter_client::GreeterClient;
use hello_world::HelloRequest;
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::trace::TraceError;
use opentelemetry::{
trace::{TraceContextExt, Tracer},
Context, KeyValue,
};
use std::error::Error;

pub mod hello_world {
tonic::include_proto!("helloworld");
}

fn tracing_init(
) -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
fn tracing_init() -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
.with_service_name("grpc-client")
Expand Down
5 changes: 2 additions & 3 deletions examples/grpc/src/server.rs
Expand Up @@ -4,6 +4,7 @@ use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::trace::TraceError;
use opentelemetry::{
trace::{Span, Tracer},
KeyValue,
Expand Down Expand Up @@ -36,9 +37,7 @@ impl Greeter for MyGreeter {
}
}

fn tracing_init(
) -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
fn tracing_init() -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
.with_service_name("grpc-server")
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-contrib/Cargo.toml
Expand Up @@ -22,7 +22,7 @@ rustdoc-args = ["--cfg", "docsrs"]
default = []
base64_format = ["base64", "binary_propagator"]
binary_propagator = []
datadog = ["indexmap", "rmp", "async-trait"]
datadog = ["indexmap", "rmp", "async-trait", "thiserror"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry/reqwest"]
reqwest-client = ["reqwest", "opentelemetry/reqwest"]
surf-client = ["surf", "opentelemetry/surf"]
Expand All @@ -37,6 +37,7 @@ reqwest = { version = "0.10", optional = true }
surf = { version = "2.0", optional = true }
http = "0.2"
base64 = { version = "0.13", optional = true }
thiserror = { version = "1.0", optional = true }

[dev-dependencies]
base64 = "0.13"
Expand Down
29 changes: 12 additions & 17 deletions opentelemetry-contrib/src/trace/exporter/datadog/mod.rs
Expand Up @@ -76,7 +76,7 @@
//! use opentelemetry::exporter::trace::HttpClient;
//! use opentelemetry_contrib::trace::exporter::datadog::{new_pipeline, ApiVersion};
//! use async_trait::async_trait;
//! use std::error::Error;
//! use opentelemetry_contrib::trace::exporter::datadog::Error;
//!
//! // `reqwest` and `surf` are supported through features, if you prefer an
//! // alternate http client you can add support by implementing `HttpClient` as
Expand All @@ -87,18 +87,18 @@
//! #[async_trait]
//! impl HttpClient for IsahcClient {
//! async fn send(&self, request: http::Request<Vec<u8>>) -> ExportResult {
//! let result = self.0.send_async(request).await?;
//! let result = self.0.send_async(request).await.map_err(|err| Error::Other(err.to_string()))?;
//!
//! if result.status().is_success() {
//! Ok(())
//! } else {
//! Err(result.status().as_str().into())
//! Err(Error::Other(result.status().to_string()).into())
//! }
//! }
//! }
//!
//! fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
//! let (tracer, _uninstall) = new_pipeline()
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! let (tracer, _uninstall) = new_pipeline()
jtescher marked this conversation as resolved.
Show resolved Hide resolved
//! .with_service_name("my_app")
//! .with_version(ApiVersion::Version05)
//! .with_agent_endpoint("http://localhost:8126")
Expand All @@ -123,14 +123,14 @@ mod intern;
mod model;

pub use model::ApiVersion;
pub use model::Error;

use async_trait::async_trait;
use http::{Method, Request, Uri};
use opentelemetry::exporter::trace;
use opentelemetry::exporter::trace::{HttpClient, SpanData};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use std::error::Error;
use std::io;

/// Default Datadog collector endpoint
const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
Expand Down Expand Up @@ -211,14 +211,12 @@ impl Default for DatadogPipelineBuilder {

impl DatadogPipelineBuilder {
/// Create `ExporterConfig` struct from current `ExporterConfigBuilder`
pub fn install(
mut self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> {
if let Some(client) = self.client {
let endpoint = self.agent_endpoint + self.version.path();
let exporter = DatadogExporter::new(
self.service_name.clone(),
endpoint.parse()?,
endpoint.parse().map_err::<Error, _>(Into::into)?,
self.version,
client,
);
Expand All @@ -233,11 +231,7 @@ impl DatadogPipelineBuilder {
let provider_guard = global::set_tracer_provider(provider);
Ok((tracer, Uninstall(provider_guard)))
} else {
Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"http client must be set, users can enable reqwest or surf feature to use http\
client implementation within create",
)))
Err(Error::NoHttpClient.into())
}
}

Expand Down Expand Up @@ -284,7 +278,8 @@ impl trace::SpanExporter for DatadogExporter {
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.body(data)?;
.body(data)
.map_err::<Error, _>(Into::into)?;
self.client.send(req).await
}
}
Expand Down
32 changes: 21 additions & 11 deletions opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs
@@ -1,21 +1,31 @@
use opentelemetry::exporter::trace;
use std::fmt;
use opentelemetry::exporter::{trace, ExportError};

mod v03;
mod v05;

#[derive(Debug, Clone, Copy)]
pub(crate) enum Error {
/// Wrap type for errors from opentelemetry datadog exporter
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Message pack error
#[error("message pack error")]
MessagePackError,
/// No http client founded. User should provide one or enable features
#[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")]
NoHttpClient,
/// Http requests failed with following errors
#[error(transparent)]
RequestError(#[from] http::Error),
/// The Uri was invalid
#[error(transparent)]
InvalidUri(#[from] http::uri::InvalidUri),
/// Other errors
#[error("{0}")]
Other(String),
}

impl std::error::Error for Error {}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::MessagePackError => write!(f, "message pack error"),
}
impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"datadog"
}
}

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -33,6 +33,7 @@ thrift = "0.13"
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
wasm-bindgen = { version = "0.2", optional = true }
wasm-bindgen-futures = { version = "0.4.18", optional = true }
thiserror = "1.0"

[dependencies.web-sys]
version = "0.3.4"
Expand Down
35 changes: 27 additions & 8 deletions opentelemetry-jaeger/src/lib.rs
Expand Up @@ -194,13 +194,14 @@ use agent::AgentAsyncClientUDP;
use async_trait::async_trait;
#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))]
use collector::CollectorAsyncClientHttp;
use opentelemetry::exporter::ExportError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
exporter::trace,
global, sdk,
trace::{Event, Link, SpanKind, StatusCode, TracerProvider},
Key, KeyValue, Value,
};
use std::error::Error;
use std::{
net,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -416,9 +417,7 @@ impl PipelineBuilder {
}

/// Install a Jaeger pipeline with the recommended defaults.
pub fn install(
self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
pub fn install(self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> {
let tracer_provider = self.build()?;
let tracer =
tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION")));
Expand All @@ -431,7 +430,8 @@ impl PipelineBuilder {
/// Build a configured `sdk::trace::TracerProvider` with the recommended defaults.
pub fn build(
mut self,
) -> Result<sdk::trace::TracerProvider, Box<dyn Error + Send + Sync + 'static>> {
) -> Result<sdk::trace::TracerProvider, Box<dyn std::error::Error + Send + Sync + 'static>>
jtescher marked this conversation as resolved.
Show resolved Hide resolved
{
let config = self.config.take();
let exporter = self.init_exporter()?;

Expand All @@ -447,7 +447,9 @@ impl PipelineBuilder {
/// Initialize a new exporter.
///
/// This is useful if you are manually constructing a pipeline.
pub fn init_exporter(self) -> Result<Exporter, Box<dyn Error + Send + Sync + 'static>> {
pub fn init_exporter(
self,
) -> Result<Exporter, Box<dyn std::error::Error + Send + Sync + 'static>> {
jtescher marked this conversation as resolved.
Show resolved Hide resolved
let export_instrumentation_lib = self.export_instrument_library;
let (process, uploader) = self.init_uploader()?;

Expand All @@ -461,15 +463,18 @@ impl PipelineBuilder {
#[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))]
fn init_uploader(
self,
) -> Result<(Process, BatchUploader), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<(Process, BatchUploader), Box<dyn std::error::Error + Send + Sync + 'static>> {
let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice())?;
Ok((self.process, BatchUploader::Agent(agent)))
}

#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))]
fn init_uploader(
self,
) -> Result<(Process, uploader::BatchUploader), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<
(Process, uploader::BatchUploader),
Box<dyn std::error::Error + Send + Sync + 'static>,
> {
if let Some(collector_endpoint) = self.collector_endpoint {
let collector = CollectorAsyncClientHttp::new(
collector_endpoint,
Expand Down Expand Up @@ -686,3 +691,17 @@ fn events_to_logs(events: sdk::trace::EvictedQueue<Event>) -> Option<Vec<jaeger:
Some(events.into_iter().map(Into::into).collect())
}
}

/// Wrap type for errors from opentelemetry jaeger
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error from thrift agents.
#[error("thrift agent failed with {0}")]
ThriftAgentError(#[from] ::thrift::Error),
}

impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"jaeger"
}
}