Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit error handling in trace #371

Merged
merged 19 commits into from Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Expand Up @@ -93,6 +93,13 @@ patterns in the spec.
For a deeper discussion, see:
https://github.com/open-telemetry/opentelemetry-specification/issues/165

### Error Handling
Currently, the Opentelemetry Rust SDK has two ways to handle errors. In the situation where errors are not allowed to return. One should call global error handler to process the errors. Otherwise, one should return the errors.

The Opentelemetry Rust SDK comes with an error type `openetelemetry::Error`. For different function, one error has been defined. All error returned by trace module MUST be wrapped in `opentelemetry::api::trace::TraceError`. All errors returned by metrics module MUST be wrapped in `opentelemetry::api::trace::MetricsError`.

For users that want to implement their own exporters. It's RECOMMENDED to wrap all errors from the exporter into a crate-level error type, and implement `ExporterError` trait.

## Style Guide

* Run `cargo clippy --all` - this will catch common mistakes and improve
Expand Down
7 changes: 2 additions & 5 deletions examples/actix-http/src/main.rs
@@ -1,17 +1,14 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use std::error::Error;

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
.with_service_name("trace-http-demo")
Expand Down
7 changes: 2 additions & 5 deletions examples/actix-udp/src/main.rs
@@ -1,17 +1,14 @@
use actix_service::Service;
use actix_web::middleware::Logger;
use actix_web::{web, App, HttpServer};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{FutureExt, TraceContextExt, Tracer},
Key,
};
use std::error::Error;

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
Expand Down
6 changes: 2 additions & 4 deletions examples/async/src/main.rs
Expand Up @@ -17,6 +17,7 @@
//! cargo run --example async_fn
//!
//! [`hello_world`]: https://github.com/tokio-rs/tokio/blob/132e9f1da5965530b63554d7a1c59824c3de4e30/tokio/examples/hello_world.rs
use opentelemetry::trace::TraceError;
use opentelemetry::{
global,
sdk::trace as sdktrace,
Expand Down Expand Up @@ -52,10 +53,7 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
write(&mut stream).with_context(cx).await
}

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_service_name("trace-demo")
.install()
Expand Down
5 changes: 2 additions & 3 deletions examples/basic-otlp/src/main.rs
@@ -1,6 +1,7 @@
use futures::stream::{Stream, StreamExt};
use opentelemetry::exporter;
use opentelemetry::sdk::metrics::PushController;
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, MetricsError, ObserverResult},
Expand All @@ -11,9 +12,7 @@ use opentelemetry::{global, sdk::trace as sdktrace};
use std::error::Error;
use std::time::Duration;

fn init_tracer(
) -> Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), TraceError> {
opentelemetry_otlp::new_pipeline().install()
}

Expand Down
6 changes: 2 additions & 4 deletions examples/basic/src/main.rs
Expand Up @@ -2,6 +2,7 @@ use futures::stream::{Stream, StreamExt};
use opentelemetry::exporter;
use opentelemetry::global;
use opentelemetry::sdk::{metrics::PushController, trace as sdktrace};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, MetricsError, ObserverResult},
Expand All @@ -11,10 +12,7 @@ use opentelemetry::{
use std::error::Error;
use std::time::Duration;

fn init_tracer() -> Result<
(sdktrace::Tracer, opentelemetry_jaeger::Uninstall),
Box<dyn Error + Send + Sync + 'static>,
> {
fn init_tracer() -> Result<(sdktrace::Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_service_name("trace-demo")
.with_tags(vec![
Expand Down
6 changes: 2 additions & 4 deletions examples/grpc/src/client.rs
Expand Up @@ -2,19 +2,17 @@ use hello_world::greeter_client::GreeterClient;
use hello_world::HelloRequest;
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::trace::TraceError;
use opentelemetry::{
trace::{TraceContextExt, Tracer},
Context, KeyValue,
};
use std::error::Error;

pub mod hello_world {
tonic::include_proto!("helloworld");
}

fn tracing_init(
) -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
fn tracing_init() -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
.with_service_name("grpc-client")
Expand Down
5 changes: 2 additions & 3 deletions examples/grpc/src/server.rs
Expand Up @@ -4,6 +4,7 @@ use hello_world::greeter_server::{Greeter, GreeterServer};
use hello_world::{HelloReply, HelloRequest};
use opentelemetry::global;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::trace::TraceError;
use opentelemetry::{
trace::{Span, Tracer},
KeyValue,
Expand Down Expand Up @@ -36,9 +37,7 @@ impl Greeter for MyGreeter {
}
}

fn tracing_init(
) -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
fn tracing_init() -> Result<(impl Tracer, opentelemetry_jaeger::Uninstall), TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
.with_service_name("grpc-server")
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-contrib/Cargo.toml
Expand Up @@ -22,7 +22,7 @@ rustdoc-args = ["--cfg", "docsrs"]
default = []
base64_format = ["base64", "binary_propagator"]
binary_propagator = []
datadog = ["indexmap", "rmp", "async-trait"]
datadog = ["indexmap", "rmp", "async-trait", "thiserror"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry/reqwest"]
reqwest-client = ["reqwest", "opentelemetry/reqwest"]
surf-client = ["surf", "opentelemetry/surf"]
Expand All @@ -37,6 +37,7 @@ reqwest = { version = "0.10", optional = true }
surf = { version = "2.0", optional = true }
http = "0.2"
base64 = { version = "0.13", optional = true }
thiserror = { version = "1.0", optional = true }

[dev-dependencies]
base64 = "0.13"
Expand Down
29 changes: 12 additions & 17 deletions opentelemetry-contrib/src/trace/exporter/datadog/mod.rs
Expand Up @@ -76,7 +76,7 @@
//! use opentelemetry::exporter::trace::HttpClient;
//! use opentelemetry_contrib::trace::exporter::datadog::{new_pipeline, ApiVersion};
//! use async_trait::async_trait;
//! use std::error::Error;
//! use opentelemetry_contrib::trace::exporter::datadog::Error;
//!
//! // `reqwest` and `surf` are supported through features, if you prefer an
//! // alternate http client you can add support by implementing `HttpClient` as
Expand All @@ -87,18 +87,18 @@
//! #[async_trait]
//! impl HttpClient for IsahcClient {
//! async fn send(&self, request: http::Request<Vec<u8>>) -> ExportResult {
//! let result = self.0.send_async(request).await?;
//! let result = self.0.send_async(request).await.map_err(|err| Error::Other(err.to_string()))?;
//!
//! if result.status().is_success() {
//! Ok(())
//! } else {
//! Err(result.status().as_str().into())
//! Err(Error::Other(result.status().to_string()).into())
//! }
//! }
//! }
//!
//! fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
//! let (tracer, _uninstall) = new_pipeline()
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! let (tracer, _uninstall) = new_pipeline()
jtescher marked this conversation as resolved.
Show resolved Hide resolved
//! .with_service_name("my_app")
//! .with_version(ApiVersion::Version05)
//! .with_agent_endpoint("http://localhost:8126")
Expand All @@ -123,14 +123,14 @@ mod intern;
mod model;

pub use model::ApiVersion;
pub use model::Error;

use async_trait::async_trait;
use http::{Method, Request, Uri};
use opentelemetry::exporter::trace;
use opentelemetry::exporter::trace::{HttpClient, SpanData};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use std::error::Error;
use std::io;

/// Default Datadog collector endpoint
const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
Expand Down Expand Up @@ -211,14 +211,12 @@ impl Default for DatadogPipelineBuilder {

impl DatadogPipelineBuilder {
/// Create `ExporterConfig` struct from current `ExporterConfigBuilder`
pub fn install(
mut self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> {
if let Some(client) = self.client {
let endpoint = self.agent_endpoint + self.version.path();
let exporter = DatadogExporter::new(
self.service_name.clone(),
endpoint.parse()?,
endpoint.parse().map_err::<Error, _>(Into::into)?,
self.version,
client,
);
Expand All @@ -233,11 +231,7 @@ impl DatadogPipelineBuilder {
let provider_guard = global::set_tracer_provider(provider);
Ok((tracer, Uninstall(provider_guard)))
} else {
Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"http client must be set, users can enable reqwest or surf feature to use http\
client implementation within create",
)))
Err(Error::NoHttpClient.into())
}
}

Expand Down Expand Up @@ -284,7 +278,8 @@ impl trace::SpanExporter for DatadogExporter {
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.body(data)?;
.body(data)
.map_err::<Error, _>(Into::into)?;
self.client.send(req).await
}
}
Expand Down
32 changes: 21 additions & 11 deletions opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs
@@ -1,21 +1,31 @@
use opentelemetry::exporter::trace;
use std::fmt;
use opentelemetry::exporter::{trace, ExportError};

mod v03;
mod v05;

#[derive(Debug, Clone, Copy)]
pub(crate) enum Error {
/// Wrap type for errors from opentelemetry datadog exporter
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Message pack error
#[error("message pack error")]
MessagePackError,
/// No http client founded. User should provide one or enable features
#[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")]
NoHttpClient,
/// Http requests failed with following errors
#[error(transparent)]
RequestError(#[from] http::Error),
/// The Uri was invalid
#[error(transparent)]
InvalidUri(#[from] http::uri::InvalidUri),
/// Other errors
#[error("{0}")]
Other(String),
}

impl std::error::Error for Error {}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::MessagePackError => write!(f, "message pack error"),
}
impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"datadog"
}
}

Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -33,6 +33,7 @@ thrift = "0.13"
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
wasm-bindgen = { version = "0.2", optional = true }
wasm-bindgen-futures = { version = "0.4.18", optional = true }
thiserror = "1.0"

[dependencies.web-sys]
version = "0.3.4"
Expand Down
36 changes: 26 additions & 10 deletions opentelemetry-jaeger/src/lib.rs
Expand Up @@ -194,13 +194,14 @@ use agent::AgentAsyncClientUDP;
use async_trait::async_trait;
#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))]
use collector::CollectorAsyncClientHttp;
use opentelemetry::exporter::ExportError;
use opentelemetry::trace::TraceError;
use opentelemetry::{
exporter::trace,
global, sdk,
trace::{Event, Link, SpanKind, StatusCode, TracerProvider},
Key, KeyValue, Value,
};
use std::error::Error;
use std::{
net,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -416,9 +417,7 @@ impl PipelineBuilder {
}

/// Install a Jaeger pipeline with the recommended defaults.
pub fn install(
self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
pub fn install(self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> {
let tracer_provider = self.build()?;
let tracer =
tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION")));
Expand All @@ -429,9 +428,7 @@ impl PipelineBuilder {
}

/// Build a configured `sdk::trace::TracerProvider` with the recommended defaults.
pub fn build(
mut self,
) -> Result<sdk::trace::TracerProvider, Box<dyn Error + Send + Sync + 'static>> {
pub fn build(mut self) -> Result<sdk::trace::TracerProvider, TraceError> {
let config = self.config.take();
let exporter = self.init_exporter()?;

Expand All @@ -447,7 +444,9 @@ impl PipelineBuilder {
/// Initialize a new exporter.
///
/// This is useful if you are manually constructing a pipeline.
pub fn init_exporter(self) -> Result<Exporter, Box<dyn Error + Send + Sync + 'static>> {
pub fn init_exporter(
self,
) -> Result<Exporter, Box<dyn std::error::Error + Send + Sync + 'static>> {
jtescher marked this conversation as resolved.
Show resolved Hide resolved
let export_instrumentation_lib = self.export_instrument_library;
let (process, uploader) = self.init_uploader()?;

Expand All @@ -461,15 +460,18 @@ impl PipelineBuilder {
#[cfg(not(any(feature = "collector_client", feature = "wasm_collector_client")))]
fn init_uploader(
self,
) -> Result<(Process, BatchUploader), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<(Process, BatchUploader), Box<dyn std::error::Error + Send + Sync + 'static>> {
let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice())?;
Ok((self.process, BatchUploader::Agent(agent)))
}

#[cfg(any(feature = "collector_client", feature = "wasm_collector_client"))]
fn init_uploader(
self,
) -> Result<(Process, uploader::BatchUploader), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<
(Process, uploader::BatchUploader),
Box<dyn std::error::Error + Send + Sync + 'static>,
> {
if let Some(collector_endpoint) = self.collector_endpoint {
let collector = CollectorAsyncClientHttp::new(
collector_endpoint,
Expand Down Expand Up @@ -686,3 +688,17 @@ fn events_to_logs(events: sdk::trace::EvictedQueue<Event>) -> Option<Vec<jaeger:
Some(events.into_iter().map(Into::into).collect())
}
}

/// Wrap type for errors from opentelemetry jaeger
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error from thrift agents.
#[error("thrift agent failed with {0}")]
ThriftAgentError(#[from] ::thrift::Error),
}

impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"jaeger"
}
}