Skip to content

Commit

Permalink
feat: add HttpClientError
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp committed Nov 26, 2020
1 parent 23823d0 commit eea7237
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 22 deletions.
4 changes: 4 additions & 0 deletions opentelemetry/src/api/mod.rs
Expand Up @@ -30,12 +30,16 @@ pub enum OpenTelemetryError {
Other(String),
}

#[cfg(feature = "trace")]
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
impl From<TraceError> for OpenTelemetryError {
fn from(err: TraceError) -> Self {
OpenTelemetryError::TraceErr(err)
}
}

#[cfg(feature = "metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
impl From<MetricsError> for OpenTelemetryError {
fn from(err: MetricsError) -> Self {
OpenTelemetryError::MetricErr(err)
Expand Down
104 changes: 93 additions & 11 deletions opentelemetry/src/exporter/trace/mod.rs
Expand Up @@ -12,6 +12,8 @@ use serde::{Deserialize, Serialize};
#[cfg(all(feature = "http", feature = "reqwest"))]
use std::convert::TryInto;
use std::fmt::Debug;
#[cfg(feature = "http")]
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use std::time::SystemTime;

Expand All @@ -28,10 +30,6 @@ pub trait ExportError: std::error::Error + Send + Sync + 'static {
}
}

#[cfg(all(feature = "reqwest", feature = "http"))]
#[async_trait]
impl ExportError for reqwest::Error {}

/// `SpanExporter` defines the interface that protocol-specific exporters must
/// implement so that they can be plugged into OpenTelemetry SDK and support
/// sending of telemetry data.
Expand Down Expand Up @@ -81,6 +79,65 @@ pub trait HttpClient: Debug + Send + Sync {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult;
}

/// Error when sending http requests visa HttpClient implementation
#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
#[derive(Debug)]
pub enum HttpClientError {
/// Errors from reqwest
#[cfg(all(feature = "reqwest", feature = "http"))]
ReqwestError(reqwest::Error),

/// Errors from surf
#[cfg(all(feature = "surf", feature = "http"))]
SurfError(surf::Error),

/// Other errors
Other(String),
}

#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
impl std::error::Error for HttpClientError {}

#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
impl ExportError for HttpClientError {}

#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
impl Display for HttpClientError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(all(feature = "reqwest", feature = "http"))]
HttpClientError::ReqwestError(err) => {
write!(f, "error when sending requests using reqwest, {}", err)
}
#[cfg(all(feature = "surf", feature = "http"))]
HttpClientError::SurfError(err) => write!(
f,
"error when sending requests using surf, {}",
err.to_string()
),
HttpClientError::Other(msg) => write!(f, "{}", msg),
}
}
}

#[cfg(all(feature = "reqwest", feature = "http"))]
impl From<reqwest::Error> for HttpClientError {
fn from(err: reqwest::Error) -> Self {
HttpClientError::ReqwestError(err)
}
}

#[cfg(all(feature = "surf", feature = "http"))]
impl From<surf::Error> for HttpClientError {
fn from(err: surf::Error) -> Self {
HttpClientError::SurfError(err)
}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
/// by exporters as a standard input.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
Expand Down Expand Up @@ -120,9 +177,15 @@ pub struct SpanData {
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult {
let _result = self
.execute(request.try_into()?)
.await?
.error_for_status()?;
.execute(
request
.try_into()
.map_err::<HttpClientError, _>(Into::into)?,
)
.await
.map_err::<HttpClientError, _>(Into::into)?
.error_for_status()
.map_err::<HttpClientError, _>(Into::into)?;
Ok(())
}
}
Expand All @@ -131,7 +194,15 @@ impl HttpClient for reqwest::Client {
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult {
let _result = self.execute(request.try_into()?)?.error_for_status()?;
let _result = self
.execute(
request
.try_into()
.map_err::<HttpClientError, _>(Into::into)?,
)
.map_err::<HttpClientError, _>(Into::into)?
.error_for_status()
.map_err::<HttpClientError, _>(Into::into)?;
Ok(())
}
}
Expand All @@ -141,17 +212,28 @@ impl HttpClient for reqwest::blocking::Client {
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult {
let (parts, body) = request.into_parts();
let uri = parts.uri.to_string().parse()?;
let uri = parts
.uri
.to_string()
.parse()
.map_err(|err: surf::http::url::ParseError| HttpClientError::Other(err.to_string()))?;

let req = surf::Request::builder(surf::http::Method::Post, uri)
.content_type("application/json")
.body(body);
let result = self.send(req).await?;
let result = self
.send(req)
.await
.map_err::<HttpClientError, _>(Into::into)?;

if result.status().is_success() {
Ok(())
} else {
Err(result.status().canonical_reason().into())
Err(HttpClientError::SurfError(surf::Error::from_str(
result.status(),
result.status().canonical_reason(),
))
.into())
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions opentelemetry/src/exporter/trace/stdout.rs
Expand Up @@ -31,8 +31,7 @@ use crate::{
trace::TracerProvider,
};
use async_trait::async_trait;
use serde::export::Formatter;
use std::fmt::{Debug, Display};
use std::fmt::{Debug, Display, Formatter};
use std::io::{stdout, Stdout, Write};

/// Pipeline builder
Expand Down Expand Up @@ -134,11 +133,11 @@ where
if self.pretty_print {
self.writer
.write_all(format!("{:#?}\n", span).as_bytes())
.map_err(|err| Error::from(err))?;
.map_err::<Error, _>(Into::into)?;
} else {
self.writer
.write_all(format!("{:?}\n", span).as_bytes())
.map_err(|err| Error::from(err))?;
.map_err::<Error, _>(Into::into)?;
}
}

Expand Down
4 changes: 4 additions & 0 deletions opentelemetry/src/global/error_handler.rs
Expand Up @@ -15,9 +15,13 @@ pub fn handle_error<T: Into<OpenTelemetryError>>(err: T) {
match GLOBAL_ERROR_HANDLER.read() {
Ok(handler) if handler.is_some() => (handler.as_ref().unwrap().0)(err.into()),
_ => match err.into() {
#[cfg(feature = "metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
OpenTelemetryError::MetricErr(err) => {
eprintln!("OpenTelemetry metrics error occurred {:?}", err)
}
#[cfg(feature = "trace")]
#[cfg_attr(docsrs, doc(cfg(feature = "trace")))]
OpenTelemetryError::TraceErr(err) => {
eprintln!("OpenTelemetry trace error occurred {:?}", err)
}
Expand Down
3 changes: 0 additions & 3 deletions opentelemetry/src/global/mod.rs
Expand Up @@ -129,7 +129,6 @@
//! [installing a metrics pipeline]: crate::exporter::metrics::stdout::StdoutExporterBuilder::try_init
//! [`MeterProvider`]: crate::metrics::MeterProvider

#[cfg(feature = "metrics")]
mod error_handler;
#[cfg(feature = "metrics")]
mod metrics;
Expand All @@ -138,8 +137,6 @@ mod propagation;
#[cfg(feature = "trace")]
mod trace;

#[cfg(feature = "metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
pub use error_handler::{handle_error, set_error_handler};
#[cfg(feature = "metrics")]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -296,7 +296,7 @@ impl BatchSpanProcessor {
);
}
let send_result = ch.send(results);
if let Err(_) = send_result {
if send_result.is_err() {
global::handle_error(TraceError::Other("fail to send the export response from worker handle in BatchProcessor".to_string()))
}
}
Expand Down Expand Up @@ -336,7 +336,7 @@ impl BatchSpanProcessor {
}
exporter.shutdown();
let send_result = ch.send(results);
if let Err(_) = send_result {
if send_result.is_err() {
global::handle_error(TraceError::Other("fail to send the export response from worker handle in BatchProcessor".to_string()))
}
break;
Expand Down
3 changes: 1 addition & 2 deletions opentelemetry/src/testing/trace.rs
Expand Up @@ -9,8 +9,7 @@ use crate::{
KeyValue,
};
use async_trait::async_trait;
use serde::export::Formatter;
use std::fmt::Display;
use std::fmt::{Display, Formatter};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::SystemTime;

Expand Down

0 comments on commit eea7237

Please sign in to comment.