Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit error handling in trace #371

Merged
merged 19 commits into from Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/basic-otlp/src/main.rs
Expand Up @@ -14,7 +14,9 @@ use std::time::Duration;
fn init_tracer(
) -> Result<(sdktrace::Tracer, opentelemetry_otlp::Uninstall), Box<dyn Error + Send + Sync + 'static>>
{
opentelemetry_otlp::new_pipeline().install()
opentelemetry_otlp::new_pipeline()
.install()
.map_err::<Box<dyn Error + Send + Sync + 'static>, _>(|err| Box::new(err))
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
}

// Skip first immediate tick from tokio, not needed for async_std.
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-contrib/Cargo.toml
Expand Up @@ -22,7 +22,7 @@ rustdoc-args = ["--cfg", "docsrs"]
default = []
base64_format = ["base64", "binary_propagator"]
binary_propagator = []
datadog = ["indexmap", "rmp", "async-trait"]
datadog = ["indexmap", "rmp", "async-trait", "thiserror"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry/reqwest"]
reqwest-client = ["reqwest", "opentelemetry/reqwest"]
surf-client = ["surf", "opentelemetry/surf"]
Expand All @@ -37,6 +37,7 @@ reqwest = { version = "0.10", optional = true }
surf = { version = "2.0", optional = true }
http = "0.2"
base64 = { version = "0.13", optional = true }
thiserror = { version = "1.0", optional = true }

[dev-dependencies]
base64 = "0.13"
Expand Down
29 changes: 12 additions & 17 deletions opentelemetry-contrib/src/trace/exporter/datadog/mod.rs
Expand Up @@ -76,7 +76,7 @@
//! use opentelemetry::exporter::trace::HttpClient;
//! use opentelemetry_contrib::trace::exporter::datadog::{new_pipeline, ApiVersion};
//! use async_trait::async_trait;
//! use std::error::Error;
//! use opentelemetry_contrib::trace::exporter::datadog::Error;
//!
//! // `reqwest` and `surf` are supported through features, if you prefer an
//! // alternate http client you can add support by implementing `HttpClient` as
Expand All @@ -87,18 +87,18 @@
//! #[async_trait]
//! impl HttpClient for IsahcClient {
//! async fn send(&self, request: http::Request<Vec<u8>>) -> ExportResult {
//! let result = self.0.send_async(request).await?;
//! let result = self.0.send_async(request).await.map_err(|err| Error::Other(err.to_string()))?;
//!
//! if result.status().is_success() {
//! Ok(())
//! } else {
//! Err(result.status().as_str().into())
//! Err(Error::Other(result.status().to_string()).into())
//! }
//! }
//! }
//!
//! fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
//! let (tracer, _uninstall) = new_pipeline()
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
//! let (tracer, _uninstall) = new_pipeline()
jtescher marked this conversation as resolved.
Show resolved Hide resolved
//! .with_service_name("my_app")
//! .with_version(ApiVersion::Version05)
//! .with_agent_endpoint("http://localhost:8126")
Expand All @@ -123,14 +123,14 @@ mod intern;
mod model;

pub use model::ApiVersion;
pub use model::Error;

use async_trait::async_trait;
use http::{Method, Request, Uri};
use opentelemetry::exporter::trace;
use opentelemetry::exporter::trace::{HttpClient, SpanData};
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk, trace::TracerProvider};
use std::error::Error;
use std::io;

/// Default Datadog collector endpoint
const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126";
Expand Down Expand Up @@ -211,14 +211,12 @@ impl Default for DatadogPipelineBuilder {

impl DatadogPipelineBuilder {
/// Create `ExporterConfig` struct from current `ExporterConfigBuilder`
pub fn install(
mut self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> {
if let Some(client) = self.client {
let endpoint = self.agent_endpoint + self.version.path();
let exporter = DatadogExporter::new(
self.service_name.clone(),
endpoint.parse()?,
endpoint.parse().map_err::<Error, _>(Into::into)?,
self.version,
client,
);
Expand All @@ -233,11 +231,7 @@ impl DatadogPipelineBuilder {
let provider_guard = global::set_tracer_provider(provider);
Ok((tracer, Uninstall(provider_guard)))
} else {
Err(Box::new(io::Error::new(
io::ErrorKind::Other,
"http client must be set, users can enable reqwest or surf feature to use http\
client implementation within create",
)))
Err(Error::NoHttpClient.into())
}
}

Expand Down Expand Up @@ -284,7 +278,8 @@ impl trace::SpanExporter for DatadogExporter {
.method(Method::POST)
.uri(self.request_url.clone())
.header(http::header::CONTENT_TYPE, self.version.content_type())
.body(data)?;
.body(data)
.map_err::<Error, _>(Into::into)?;
self.client.send(req).await
}
}
Expand Down
38 changes: 36 additions & 2 deletions opentelemetry-contrib/src/trace/exporter/datadog/model/mod.rs
@@ -1,24 +1,58 @@
use http::uri::InvalidUri;
use opentelemetry::exporter::trace;
use opentelemetry::exporter::trace::ExportError;
use std::fmt;

mod v03;
mod v05;

#[derive(Debug, Clone, Copy)]
pub(crate) enum Error {
/// Wrap type for errors from opentelemetry datadog exporter
#[derive(Debug)]
pub enum Error {
/// Message pack error
MessagePackError,
/// No http client founded. User should provide one or enbale features
NoHttpClient,
/// Http requests failed with following errors
RequestError(http::Error),
/// The Uri was invalid.
InvalidUri(http::uri::InvalidUri),
/// Other errors
Other(String),
}

impl std::error::Error for Error {}

impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"datadog"
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::MessagePackError => write!(f, "message pack error"),
Error::NoHttpClient => write!(f, "http client must be set, users can enable reqwest or surf feature to use http client implementation within create"),
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
Error::RequestError(err) => write!(f, "{}", err),
Error::InvalidUri(err) => write!(f, "{}", err),
Error::Other(msg) => write!(f, "{}", msg)
}
}
}

impl From<http::uri::InvalidUri> for Error {
fn from(err: InvalidUri) -> Self {
Error::InvalidUri(err)
}
}

impl From<http::Error> for Error {
fn from(err: http::Error) -> Self {
Error::RequestError(err)
}
}

impl From<rmp::encode::ValueWriteError> for Error {
fn from(_: rmp::encode::ValueWriteError) -> Self {
Self::MessagePackError
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -27,6 +27,7 @@ isahc = { version = "0.9", default-features = false, optional = true }
opentelemetry = { version = "0.10", default-features = false, features = ["trace"], path = "../opentelemetry" }
thrift = "0.13"
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
thiserror = "1.0"

[features]
default = []
Expand Down
33 changes: 27 additions & 6 deletions opentelemetry-jaeger/src/lib.rs
Expand Up @@ -194,13 +194,13 @@ use agent::AgentAsyncClientUDP;
use async_trait::async_trait;
#[cfg(feature = "collector_client")]
use collector::CollectorAsyncClientHttp;
use opentelemetry::exporter::trace::ExportError;
use opentelemetry::{
exporter::trace,
global, sdk,
trace::{Event, Link, SpanKind, StatusCode, TracerProvider},
Key, KeyValue, Value,
};
use std::error::Error;
use std::{
net,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -409,7 +409,8 @@ impl PipelineBuilder {
/// Install a Jaeger pipeline with the recommended defaults.
pub fn install(
self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn std::error::Error + Send + Sync + 'static>>
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
{
let tracer_provider = self.build()?;
let tracer =
tracer_provider.get_tracer("opentelemetry-jaeger", Some(env!("CARGO_PKG_VERSION")));
Expand All @@ -422,7 +423,8 @@ impl PipelineBuilder {
/// Build a configured `sdk::trace::TracerProvider` with the recommended defaults.
pub fn build(
mut self,
) -> Result<sdk::trace::TracerProvider, Box<dyn Error + Send + Sync + 'static>> {
) -> Result<sdk::trace::TracerProvider, Box<dyn std::error::Error + Send + Sync + 'static>>
jtescher marked this conversation as resolved.
Show resolved Hide resolved
{
let config = self.config.take();
let exporter = self.init_exporter()?;

Expand All @@ -438,7 +440,9 @@ impl PipelineBuilder {
/// Initialize a new exporter.
///
/// This is useful if you are manually constructing a pipeline.
pub fn init_exporter(self) -> Result<Exporter, Box<dyn Error + Send + Sync + 'static>> {
pub fn init_exporter(
self,
) -> Result<Exporter, Box<dyn std::error::Error + Send + Sync + 'static>> {
jtescher marked this conversation as resolved.
Show resolved Hide resolved
let export_instrumentation_lib = self.export_instrument_library;
let (process, uploader) = self.init_uploader()?;

Expand All @@ -452,15 +456,18 @@ impl PipelineBuilder {
#[cfg(not(feature = "collector_client"))]
fn init_uploader(
self,
) -> Result<(Process, BatchUploader), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<(Process, BatchUploader), Box<dyn std::error::Error + Send + Sync + 'static>> {
let agent = AgentAsyncClientUDP::new(self.agent_endpoint.as_slice())?;
Ok((self.process, BatchUploader::Agent(agent)))
}

#[cfg(feature = "collector_client")]
fn init_uploader(
self,
) -> Result<(Process, uploader::BatchUploader), Box<dyn Error + Send + Sync + 'static>> {
) -> Result<
(Process, uploader::BatchUploader),
Box<dyn std::error::Error + Send + Sync + 'static>,
> {
if let Some(collector_endpoint) = self.collector_endpoint {
let collector = CollectorAsyncClientHttp::new(
collector_endpoint,
Expand Down Expand Up @@ -677,3 +684,17 @@ fn events_to_logs(events: sdk::trace::EvictedQueue<Event>) -> Option<Vec<jaeger:
Some(events.into_iter().map(Into::into).collect())
}
}

/// Wrap type for errors from opentelemetry jaeger
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error from thrift agents.
#[error("thrift agent failed with {0}")]
ThriftAgentError(#[from] ::thrift::Error),
}

impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"jaeger"
}
}
12 changes: 9 additions & 3 deletions opentelemetry-jaeger/src/uploader.rs
Expand Up @@ -20,14 +20,20 @@ impl BatchUploader {
match self {
BatchUploader::Agent(client) => {
// TODO Implement retry behaviour
client.emit_batch(batch).await?;
client
.emit_batch(batch)
.await
.map_err::<crate::Error, _>(Into::into)?;
}
#[cfg(feature = "collector_client")]
BatchUploader::Collector(collector) => {
// TODO Implement retry behaviour
collector.submit_batch(batch).await?;
collector
.submit_batch(batch)
.await
.map_err::<crate::Error, _>(Into::into)?;
}
};
}
Ok(())
}
}
1 change: 1 addition & 0 deletions opentelemetry-otlp/Cargo.toml
Expand Up @@ -26,6 +26,7 @@ futures = "0.3"
grpcio = "0.6"
opentelemetry = { version = "0.10", default-features = false, features = ["trace"], path = "../opentelemetry" }
protobuf = "2.18"
thiserror = "1.0"

[features]
openssl = ["grpcio/openssl"]
Expand Down
22 changes: 18 additions & 4 deletions opentelemetry-otlp/src/lib.rs
Expand Up @@ -122,7 +122,6 @@

use opentelemetry::{global, sdk, trace::TracerProvider};
use std::collections::HashMap;
use std::error::Error;
use std::time::Duration;

#[allow(clippy::all, unreachable_pub, dead_code)]
Expand All @@ -132,6 +131,8 @@ mod span;
mod transform;

pub use crate::span::{Compression, Credentials, Exporter, ExporterConfig, Protocol};
use opentelemetry::exporter::trace::ExportError;
use opentelemetry::trace::TraceError;

/// Create a new pipeline builder with the recommended configuration.
///
Expand Down Expand Up @@ -215,9 +216,7 @@ impl OtlpPipelineBuilder {
}

/// Install the OTLP exporter pipeline with the recommended defaults.
pub fn install(
mut self,
) -> Result<(sdk::trace::Tracer, Uninstall), Box<dyn Error + Send + Sync + 'static>> {
pub fn install(mut self) -> Result<(sdk::trace::Tracer, Uninstall), TraceError> {
let exporter = Exporter::new(self.exporter_config);

let mut provider_builder = sdk::trace::TracerProvider::builder().with_exporter(exporter);
Expand All @@ -235,3 +234,18 @@ impl OtlpPipelineBuilder {
/// Uninstalls the OTLP pipeline on drop
#[derive(Debug)]
pub struct Uninstall(global::TracerProviderGuard);

/// Wrap type for errors from opentelemetry otel
#[derive(thiserror::Error, Debug)]
pub enum Error {
// FIXME: wait until https://github.com/open-telemetry/opentelemetry-rust/pull/352 merged
/// Error from grpcio module
#[error("grpcio error {0}")]
Grpcio(#[from] grpcio::Error),
}

impl ExportError for Error {
fn exporter_name(&self) -> &'static str {
"otel"
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
}
}
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/span.rs
Expand Up @@ -167,8 +167,9 @@ impl SpanExporter for Exporter {

let receiver = self
.trace_exporter
.export_async_opt(&request, call_options)?;
receiver.await?;
.export_async_opt(&request, call_options)
.map_err::<crate::Error, _>(Into::into)?;
receiver.await.map_err::<crate::Error, _>(Into::into)?;
Ok(())
}
}
1 change: 1 addition & 0 deletions opentelemetry-zipkin/Cargo.toml
Expand Up @@ -35,6 +35,7 @@ lazy_static = "1.4"
http = "0.2"
reqwest = { version = "0.10", optional = true }
surf = { version = "2.0", optional = true }
thiserror = { version = "1.0"}

[dev-dependencies]
isahc = "=0.9.6"