Skip to content

Commit

Permalink
feat: add HttpClientError
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp committed Nov 26, 2020
1 parent 23823d0 commit 25ae3ec
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 16 deletions.
103 changes: 91 additions & 12 deletions opentelemetry/src/exporter/trace/mod.rs
Expand Up @@ -7,11 +7,12 @@ use crate::{
use async_trait::async_trait;
#[cfg(feature = "http")]
use http::Request;
use serde::export::Formatter;
#[cfg(feature = "serialize")]
use serde::{Deserialize, Serialize};
#[cfg(all(feature = "http", feature = "reqwest"))]
use std::convert::TryInto;
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::time::SystemTime;

Expand All @@ -28,10 +29,6 @@ pub trait ExportError: std::error::Error + Send + Sync + 'static {
}
}

#[cfg(all(feature = "reqwest", feature = "http"))]
#[async_trait]
impl ExportError for reqwest::Error {}

/// `SpanExporter` defines the interface that protocol-specific exporters must
/// implement so that they can be plugged into OpenTelemetry SDK and support
/// sending of telemetry data.
Expand Down Expand Up @@ -81,6 +78,63 @@ pub trait HttpClient: Debug + Send + Sync {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult;
}

/// Error when sending http requests visa HttpClient implementation
#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
#[derive(Debug)]
pub enum HttpClientError {
/// Errors from reqwest
#[cfg(all(feature = "reqwest", feature = "http"))]
ReqwestError(reqwest::Error),

/// Errors from surf
#[cfg(all(feature = "surf", feature = "http"))]
SurfError(surf::Error),

/// Other errors
Other(String),
}

impl std::error::Error for HttpClientError {}

#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
impl ExportError for HttpClientError {}

#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
impl Display for HttpClientError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
#[cfg(all(feature = "reqwest", feature = "http"))]
HttpClientError::ReqwestError(err) => {
write!(f, "error when sending requests using reqwest, {}", err)
}
#[cfg(all(feature = "surf", feature = "http"))]
HttpClientError::SurfError(err) => write!(
f,
"error when sending requests using surf, {}",
err.to_string()
),
HttpClientError::Other(msg) => write!(f, "{}", msg),
}
}
}

#[cfg(all(feature = "reqwest", feature = "http"))]
impl From<reqwest::Error> for HttpClientError {
fn from(err: reqwest::Error) -> Self {
HttpClientError::ReqwestError(err)
}
}

#[cfg(all(feature = "surf", feature = "http"))]
impl From<surf::Error> for HttpClientError {
fn from(err: surf::Error) -> Self {
HttpClientError::SurfError(err)
}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
/// by exporters as a standard input.
#[cfg_attr(feature = "serialize", derive(Deserialize, Serialize))]
Expand Down Expand Up @@ -120,9 +174,15 @@ pub struct SpanData {
impl HttpClient for reqwest::Client {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult {
let _result = self
.execute(request.try_into()?)
.await?
.error_for_status()?;
.execute(
request
.try_into()
.map_err::<HttpClientError, _>(Into::into)?,
)
.await
.map_err::<HttpClientError, _>(Into::into)?
.error_for_status()
.map_err::<HttpClientError, _>(Into::into)?;
Ok(())
}
}
Expand All @@ -131,7 +191,15 @@ impl HttpClient for reqwest::Client {
#[async_trait]
impl HttpClient for reqwest::blocking::Client {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult {
let _result = self.execute(request.try_into()?)?.error_for_status()?;
let _result = self
.execute(
request
.try_into()
.map_err::<HttpClientError, _>(Into::into)?,
)
.map_err::<HttpClientError, _>(Into::into)?
.error_for_status()
.map_err::<HttpClientError, _>(Into::into)?;
Ok(())
}
}
Expand All @@ -141,17 +209,28 @@ impl HttpClient for reqwest::blocking::Client {
impl HttpClient for surf::Client {
async fn send(&self, request: Request<Vec<u8>>) -> ExportResult {
let (parts, body) = request.into_parts();
let uri = parts.uri.to_string().parse()?;
let uri = parts
.uri
.to_string()
.parse()
.map_err(|err: surf::http::url::ParseError| HttpClientError::Other(err.to_string()))?;

let req = surf::Request::builder(surf::http::Method::Post, uri)
.content_type("application/json")
.body(body);
let result = self.send(req).await?;
let result = self
.send(req)
.await
.map_err::<HttpClientError, _>(Into::into)?;

if result.status().is_success() {
Ok(())
} else {
Err(result.status().canonical_reason().into())
Err(HttpClientError::SurfError(surf::Error::from_str(
result.status(),
result.status().canonical_reason(),
))
.into())
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/src/exporter/trace/stdout.rs
Expand Up @@ -134,11 +134,11 @@ where
if self.pretty_print {
self.writer
.write_all(format!("{:#?}\n", span).as_bytes())
.map_err(|err| Error::from(err))?;
.map_err::<Error, _>(Into::into)?;
} else {
self.writer
.write_all(format!("{:?}\n", span).as_bytes())
.map_err(|err| Error::from(err))?;
.map_err::<Error, _>(Into::into)?;
}
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -296,7 +296,7 @@ impl BatchSpanProcessor {
);
}
let send_result = ch.send(results);
if let Err(_) = send_result {
if send_result.is_err() {
global::handle_error(TraceError::Other("fail to send the export response from worker handle in BatchProcessor".to_string()))
}
}
Expand Down Expand Up @@ -336,7 +336,7 @@ impl BatchSpanProcessor {
}
exporter.shutdown();
let send_result = ch.send(results);
if let Err(_) = send_result {
if send_result.is_err() {
global::handle_error(TraceError::Other("fail to send the export response from worker handle in BatchProcessor".to_string()))
}
break;
Expand Down

0 comments on commit 25ae3ec

Please sign in to comment.