Skip to content

Commit

Permalink
Add wasm support to jaeger (#365)
Browse files Browse the repository at this point in the history
  • Loading branch information
boxdot committed Nov 28, 2020
1 parent 6249046 commit ac8e4e5
Show file tree
Hide file tree
Showing 17 changed files with 323 additions and 111 deletions.
29 changes: 29 additions & 0 deletions opentelemetry-jaeger/Cargo.toml
Expand Up @@ -22,12 +22,41 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
async-std = { version = "1.6", optional = true }
async-trait = "0.1"
base64 = { version = "0.13", optional = true }
futures-util = { version = "0.3", optional = true }
http = { version = "0.2", optional = true }
isahc = { version = "0.9", default-features = false, optional = true }
js-sys = { version = "0.3", optional = true }
opentelemetry = { version = "0.10", default-features = false, features = ["trace"], path = "../opentelemetry" }
pin-project = { version = "1.0", optional = true }
thrift = "0.13"
tokio = { version = "0.2", features = ["udp", "sync"], optional = true }
wasm-bindgen = { version = "0.2", optional = true }
wasm-bindgen-futures = { version = "0.4.18", optional = true }

[dependencies.web-sys]
version = "0.3.4"
features = [
'Headers',
'Request',
'RequestCredentials',
'RequestInit',
'RequestMode',
'Response',
'Window',
]
optional = true

[features]
default = []
collector_client = ["isahc", "http"]
wasm_collector_client = [
"base64",
"futures-util",
"http",
"js-sys",
"pin-project",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
285 changes: 222 additions & 63 deletions opentelemetry-jaeger/src/collector.rs
@@ -1,88 +1,247 @@
//! # HTTP Jaeger Collector Client
use crate::thrift::jaeger;
use http::{Request, Uri};
use isahc::{
auth::{Authentication, Credentials},
config::Configurable,
HttpClient,
};
use std::io::{self, Cursor};
use std::sync::atomic::{AtomicUsize, Ordering};
use thrift::protocol::TBinaryOutputProtocol;
use http::Uri;
use std::sync::atomic::AtomicUsize;

/// `CollectorAsyncClientHttp` implements an async version of the
/// `TCollectorSyncClient` interface over HTTP
#[derive(Debug)]
pub(crate) struct CollectorAsyncClientHttp {
endpoint: Uri,
client: HttpClient,
#[cfg(feature = "collector_client")]
client: isahc::HttpClient,
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
client: WasmHttpClient,
payload_size_estimate: AtomicUsize,
}

impl CollectorAsyncClientHttp {
/// Create a new HTTP collector client
pub(crate) fn new(
endpoint: Uri,
username: Option<String>,
password: Option<String>,
) -> thrift::Result<Self> {
let mut builder = HttpClient::builder();
if let (Some(username), Some(password)) = (username, password) {
builder = builder
.authentication(Authentication::basic())
.credentials(Credentials::new(username, password));
#[cfg(feature = "wasm_collector_client")]
#[derive(Debug)]
struct WasmHttpClient {
auth: Option<String>,
}

#[cfg(feature = "collector_client")]
mod collector_client {
use super::*;
use crate::thrift::jaeger;
use http::{Request, Uri};
use isahc::{
auth::{Authentication, Credentials},
config::Configurable,
HttpClient,
};
use std::io::{self, Cursor};
use std::sync::atomic::{AtomicUsize, Ordering};
use thrift::protocol::TBinaryOutputProtocol;

impl CollectorAsyncClientHttp {
/// Create a new HTTP collector client
pub(crate) fn new(
endpoint: Uri,
username: Option<String>,
password: Option<String>,
) -> thrift::Result<Self> {
let mut builder = HttpClient::builder();
if let (Some(username), Some(password)) = (username, password) {
builder = builder
.authentication(Authentication::basic())
.credentials(Credentials::new(username, password));
}
let client = builder
.build()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
let payload_size_estimate = AtomicUsize::new(512);

Ok(CollectorAsyncClientHttp {
endpoint,
client,
payload_size_estimate,
})
}

/// Submit list of Jaeger batches
pub(crate) async fn submit_batch(
&self,
batch: jaeger::Batch,
) -> thrift::Result<jaeger::BatchSubmitResponse> {
// estimate transport capacity based on last request
let estimate = self.payload_size_estimate.load(Ordering::Relaxed);

// Write payload to transport buffer
let transport = Cursor::new(Vec::with_capacity(estimate));
let mut protocol = TBinaryOutputProtocol::new(transport, true);
batch.write_to_out_protocol(&mut protocol)?;

// Use current batch capacity as new estimate
self.payload_size_estimate
.store(protocol.transport.get_ref().len(), Ordering::Relaxed);

// Build collector request
let req = Request::builder()
.method("POST")
.uri(&self.endpoint)
.header("Content-Type", "application/vnd.apache.thrift.binary")
.body(protocol.transport.into_inner())
.expect("request should always be valid");

// Send request to collector
let res = self
.client
.send_async(req)
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;

if !res.status().is_success() {
return Err(thrift::Error::from(io::Error::new(
io::ErrorKind::Other,
format!("Expected success response, got {:?}", res.status()),
)));
}

Ok(jaeger::BatchSubmitResponse { ok: true })
}
let client = builder
.build()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
let payload_size_estimate = AtomicUsize::new(512);

Ok(CollectorAsyncClientHttp {
endpoint,
client,
payload_size_estimate,
})
}
}

#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
mod wasm_collector_client {
use super::*;
use crate::thrift::jaeger;
use futures_util::future;
use http::Uri;
use js_sys::Uint8Array;
use std::future::Future;
use std::io::{self, Cursor};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use thrift::protocol::TBinaryOutputProtocol;
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::JsFuture;
use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response};

impl CollectorAsyncClientHttp {
/// Create a new HTTP collector client
pub(crate) fn new(
endpoint: Uri,
username: Option<String>,
password: Option<String>,
) -> thrift::Result<Self> {
let auth = if let (Some(username), Some(password)) = (username, password) {
let mut auth = String::from("Basic ");
base64::encode_config_buf(username, base64::STANDARD, &mut auth);
auth.push(':');
base64::encode_config_buf(password, base64::STANDARD, &mut auth);
Some(auth)
} else {
None
};
let payload_size_estimate = AtomicUsize::new(512);

Ok(Self {
endpoint,
client: WasmHttpClient { auth },
payload_size_estimate,
})
}

/// Submit list of Jaeger batches
pub(crate) async fn submit_batch(
&self,
batch: jaeger::Batch,
) -> thrift::Result<jaeger::BatchSubmitResponse> {
// estimate transport capacity based on last request
let estimate = self.payload_size_estimate.load(Ordering::Relaxed);

// Write payload to transport buffer
let transport = Cursor::new(Vec::with_capacity(estimate));
let mut protocol = TBinaryOutputProtocol::new(transport, true);
batch.write_to_out_protocol(&mut protocol)?;

// Use current batch capacity as new estimate
self.payload_size_estimate
.store(protocol.transport.get_ref().len(), Ordering::Relaxed);

// Build collector request
let req = Request::builder()
.method("POST")
.uri(&self.endpoint)
.header("Content-Type", "application/vnd.apache.thrift.binary")
.body(protocol.transport.into_inner())
.expect("request should always be valid");
/// Submit list of Jaeger batches
pub(crate) fn submit_batch(
&self,
batch: jaeger::Batch,
) -> impl Future<Output = thrift::Result<jaeger::BatchSubmitResponse>> + Send + 'static
{
self.build_request(batch)
.map(post_request)
.map(|fut| future::Either::Left(SubmitBatchFuture(fut)))
.unwrap_or_else(|e| future::Either::Right(future::err(e)))
}

fn build_request(&self, batch: jaeger::Batch) -> thrift::Result<Request> {
// estimate transport capacity based on last request
let estimate = self.payload_size_estimate.load(Ordering::Relaxed);

// Write payload to transport buffer
let transport = Cursor::new(Vec::with_capacity(estimate));
let mut protocol = TBinaryOutputProtocol::new(transport, true);
batch.write_to_out_protocol(&mut protocol)?;

// Use current batch capacity as new estimate
self.payload_size_estimate
.store(protocol.transport.get_ref().len(), Ordering::Relaxed);

// Build collector request
let mut options = RequestInit::new();
options.method("POST");
options.mode(RequestMode::Cors);

let body: Uint8Array = protocol.transport.get_ref().as_slice().into();
options.body(Some(body.as_ref()));

if self.client.auth.is_some() {
options.credentials(RequestCredentials::Include);
}

let request = Request::new_with_str_and_init(&self.endpoint.to_string(), &options)
.map_err(jsvalue_into_ioerror)?;
let headers = request.headers();
headers
.set("Content-Type", "application/vnd.apache.thrift.binary")
.map_err(jsvalue_into_ioerror)?;
if let Some(auth) = self.client.auth.as_ref() {
headers
.set("Authorization", auth)
.map_err(jsvalue_into_ioerror)?;
}

Ok(request)
}
}

async fn post_request(request: Request) -> thrift::Result<jaeger::BatchSubmitResponse> {
// Send request to collector
let res = self
.client
.send_async(req)
let window = web_sys::window().unwrap();
let res_value = JsFuture::from(window.fetch_with_request(&request))
.await
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
.map_err(jsvalue_into_ioerror)?;
let res: Response = res_value.dyn_into().unwrap();

if !res.status().is_success() {
if !res.ok() {
return Err(thrift::Error::from(io::Error::new(
io::ErrorKind::Other,
format!("Expected success response, got {:?}", res.status()),
format!(
"Expected success response, got {} ({})",
res.status(),
res.status_text()
),
)));
}

Ok(jaeger::BatchSubmitResponse { ok: true })
}

/// Wrapper of web fetch API future marked as Send.
///
/// At the moment, the web APIs are single threaded. Since all opentelemetry futures are
/// required to be Send, we mark this future as Send.
#[pin_project::pin_project]
struct SubmitBatchFuture<F>(#[pin] F);

unsafe impl<F> Send for SubmitBatchFuture<F> {}

impl<F: Future> Future for SubmitBatchFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().0.poll(cx)
}
}

fn jsvalue_into_ioerror(value: wasm_bindgen::JsValue) -> io::Error {
io::Error::new(
io::ErrorKind::Other,
js_sys::JSON::stringify(&value)
.map(String::from)
.unwrap_or_else(|_| "unknown error".to_string()),
)
}
}

0 comments on commit ac8e4e5

Please sign in to comment.