Skip to content

Commit

Permalink
Change SpanExporter::export &self to &mut self (#350)
Browse files Browse the repository at this point in the history
This enforces the following part of the spec:

> Export() will never be called concurrently for the same exporter
> instance. Export() can be called again only after the current call
> returns.

None of the functions in the SpanExporter require it to be Sync. Export
is guaranteed not to be called concurrently and shutdown should only be
called once in total.

This change means the stdout exporter no longer requires a Mutex around
the Write.

Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
  • Loading branch information
frigus02 and TommyCpp committed Nov 8, 2020
1 parent fd8cf65 commit 5cf034c
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 88 deletions.
2 changes: 1 addition & 1 deletion opentelemetry-contrib/src/trace/exporter/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ impl DatadogPipelineBuilder {
#[async_trait]
impl trace::SpanExporter for DatadogExporter {
/// Export spans to datadog-agent
async fn export(&self, batch: Vec<SpanData>) -> trace::ExportResult {
async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
let data = self.version.encode(&self.service_name, batch)?;
let req = Request::builder()
.method(Method::POST)
Expand Down
46 changes: 14 additions & 32 deletions opentelemetry-jaeger/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::thrift::{
use crate::transport::{TBufferChannel, TNoopChannel};
use std::fmt;
use std::net::{ToSocketAddrs, UdpSocket};
use std::sync::Mutex;
use thrift::{
protocol::{TCompactInputProtocol, TCompactOutputProtocol},
transport::{ReadHalf, TIoChannel, WriteHalf},
Expand Down Expand Up @@ -37,10 +36,10 @@ pub(crate) struct AgentAsyncClientUDP {
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
conn: UdpSocket,
#[cfg(feature = "tokio")]
conn: tokio::sync::Mutex<tokio::net::UdpSocket>,
conn: tokio::net::UdpSocket,
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
conn: async_std::sync::Mutex<async_std::net::UdpSocket>,
buffer_client: Mutex<BufferClient>,
conn: async_std::net::UdpSocket,
buffer_client: BufferClient,
}

impl AgentAsyncClientUDP {
Expand All @@ -59,33 +58,18 @@ impl AgentAsyncClientUDP {
#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
conn,
#[cfg(feature = "tokio")]
conn: tokio::sync::Mutex::new(tokio::net::UdpSocket::from_std(conn)?),
conn: tokio::net::UdpSocket::from_std(conn)?,
#[cfg(all(feature = "async-std", not(feature = "tokio")))]
conn: async_std::sync::Mutex::new(async_std::net::UdpSocket::from(conn)),
buffer_client: Mutex::new(BufferClient { buffer, client }),
conn: async_std::net::UdpSocket::from(conn),
buffer_client: BufferClient { buffer, client },
})
}

/// Emit standard Jaeger batch
pub(crate) async fn emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()> {
pub(crate) async fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
// Write payload to buffer
let payload = self
.buffer_client
.lock()
.map_err(|err| {
thrift::Error::from(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
))
})
.and_then(|mut buffer_client| {
// Write to tmp buffer
buffer_client.client.emit_batch(batch)?;
// extract written payload, clearing buffer
let payload = buffer_client.buffer.take_bytes();

Ok(payload)
})?;
self.buffer_client.client.emit_batch(batch)?;
let payload = self.buffer_client.buffer.take_bytes();

// Write async to socket, reading from buffer
write_to_socket(self, payload).await?;
Expand All @@ -95,24 +79,22 @@ impl AgentAsyncClientUDP {
}

#[cfg(all(not(feature = "async-std"), not(feature = "tokio")))]
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
client.conn.send(&payload)?;

Ok(())
}

#[cfg(feature = "tokio")]
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
let mut conn = client.conn.lock().await;
conn.send(&payload).await?;
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
client.conn.send(&payload).await?;

Ok(())
}

#[cfg(all(feature = "async-std", not(feature = "tokio")))]
async fn write_to_socket(client: &AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
let conn = client.conn.lock().await;
conn.send(&payload).await?;
async fn write_to_socket(client: &mut AgentAsyncClientUDP, payload: Vec<u8>) -> thrift::Result<()> {
client.conn.send(&payload).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion opentelemetry-jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Into<jaeger::Process> for Process {
#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Jaeger
async fn export(&self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let mut jaeger_spans: Vec<jaeger::Span> = Vec::with_capacity(batch.len());
let mut process = self.process.clone();

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-jaeger/src/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) enum BatchUploader {

impl BatchUploader {
/// Emit a jaeger batch for the given uploader
pub(crate) async fn upload(&self, batch: jaeger::Batch) -> trace::ExportResult {
pub(crate) async fn upload(&mut self, batch: jaeger::Batch) -> trace::ExportResult {
match self {
BatchUploader::Agent(client) => {
// TODO Implement retry behaviour
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl Exporter {

#[async_trait]
impl SpanExporter for Exporter {
async fn export(&self, batch: Vec<SpanData>) -> ExportResult {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
let request = ExportTraceServiceRequest {
resource_spans: RepeatedField::from_vec(batch.into_iter().map(Into::into).collect()),
unknown_fields: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-zipkin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl ZipkinPipelineBuilder {
#[async_trait]
impl trace::SpanExporter for Exporter {
/// Export spans to Zipkin collector.
async fn export(&self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
async fn export(&mut self, batch: Vec<trace::SpanData>) -> trace::ExportResult {
let zipkin_spans = batch
.into_iter()
.map(|span| model::into_zipkin_span(self.local_endpoint.clone(), span))
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry/src/api/trace/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl NoopSpanExporter {

#[async_trait]
impl SpanExporter for NoopSpanExporter {
async fn export(&self, _batch: Vec<SpanData>) -> ExportResult {
async fn export(&mut self, _batch: Vec<SpanData>) -> ExportResult {
Ok(())
}
}
Expand Down
42 changes: 20 additions & 22 deletions opentelemetry/src/exporter/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,43 @@ pub type ExportResult = Result<(), Box<dyn std::error::Error + Send + Sync + 'st
/// implement so that they can be plugged into OpenTelemetry SDK and support
/// sending of telemetry data.
///
/// The goals of the interface are:
///
/// - Minimize burden of implementation for protocol-dependent telemetry
/// exporters. The protocol exporter is expected to be primarily a simple
/// telemetry data encoder and transmitter.
/// - Allow implementing helpers as composable components that use the same
/// chainable Exporter interface. SDK authors are encouraged to implement common
/// functionality such as queuing, batching, tagging, etc. as helpers. This
/// functionality will be applicable regardless of what protocol exporter is used.
/// The goal of the interface is to minimize burden of implementation for
/// protocol-dependent telemetry exporters. The protocol exporter is expected to
/// be primarily a simple telemetry data encoder and transmitter.
#[async_trait]
pub trait SpanExporter: Send + Sync + std::fmt::Debug {
/// Exports a batch of telemetry data. Protocol exporters that will implement
/// this function are typically expected to serialize and transmit the data
/// to the destination.
pub trait SpanExporter: Send + Debug {
/// Exports a batch of readable spans. Protocol exporters that will
/// implement this function are typically expected to serialize and transmit
/// the data to the destination.
///
/// This function will never be called concurrently for the same exporter
/// instance. It can be called again only after the current call returns.
///
/// This function must not block indefinitely, there must be a reasonable
/// upper limit after which the call must time out with an error result.
async fn export(&self, batch: Vec<SpanData>) -> ExportResult;
///
/// Any retry logic that is required by the exporter is the responsibility
/// of the exporter.
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult;

/// Shuts down the exporter. Called when SDK is shut down. This is an
/// opportunity for exporter to do any cleanup required.
///
/// `shutdown` should be called only once for each Exporter instance. After
/// the call to `shutdown`, subsequent calls to `SpanExport` are not allowed
/// and should return an error.
/// This function should be called only once for each `SpanExporter`
/// instance. After the call to `shutdown`, subsequent calls to `export` are
/// not allowed and should return an error.
///
/// Shutdown should not block indefinitely (e.g. if it attempts to flush the
/// data and the destination is unavailable). SDK authors can
/// decide if they want to make the shutdown timeout to be configurable.
/// This function should not block indefinitely (e.g. if it attempts to
/// flush the data and the destination is unavailable). SDK authors
/// can decide if they want to make the shutdown timeout
/// configurable.
fn shutdown(&mut self) {}
}

/// A minimal interface necessary for export spans over HTTP.
///
/// Users sometime choose http clients that relay on certain runtime. This trait allows users to bring
/// their choice of http clients.
/// Users sometime choose http clients that relay on certain runtime. This trait
/// allows users to bring their choice of http clients.
#[cfg(feature = "http")]
#[cfg_attr(docsrs, doc(cfg(feature = "http")))]
#[async_trait]
Expand Down
17 changes: 6 additions & 11 deletions opentelemetry/src/exporter/trace/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use crate::{
};
use async_trait::async_trait;
use std::fmt::Debug;
use std::io::{self, stdout, Stdout, Write};
use std::sync::Mutex;
use std::io::{stdout, Stdout, Write};

/// Pipeline builder
#[derive(Debug)]
Expand Down Expand Up @@ -108,15 +107,15 @@ where
/// [`Stdout`]: std::io::Stdout
#[derive(Debug)]
pub struct Exporter<W: Write> {
writer: Mutex<W>,
writer: W,
pretty_print: bool,
}

impl<W: Write> Exporter<W> {
/// Create a new stdout `Exporter`.
pub fn new(writer: W, pretty_print: bool) -> Self {
Self {
writer: Mutex::new(writer),
writer,
pretty_print,
}
}
Expand All @@ -128,16 +127,12 @@ where
W: Write + Debug + Send + 'static,
{
/// Export spans to stdout
async fn export(&self, batch: Vec<SpanData>) -> ExportResult {
let mut writer = self
.writer
.lock()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
for span in batch {
if self.pretty_print {
writer.write_all(format!("{:#?}\n", span).as_bytes())?;
self.writer.write_all(format!("{:#?}\n", span).as_bytes())?;
} else {
writer.write_all(format!("{:?}\n", span).as_bytes())?;
self.writer.write_all(format!("{:?}\n", span).as_bytes())?;
}
}

Expand Down
60 changes: 44 additions & 16 deletions opentelemetry/src/sdk/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ const OTEL_BSP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE";
/// Default maximum batch size
const OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;

/// `SpanProcessor`s allow hooks for span start and end method invocations.
/// `SpanProcessor` is an interface which allows hooks for span start and end
/// method invocations. The span processors are invoked only when is_recording
/// is true.
pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// `on_start` method is invoked when a `Span` is started.
/// `on_start` is called when a `Span` is started. This method is called
/// synchronously on the thread that started the span, therefore it should
/// not block or throw exceptions.
fn on_start(&self, span: &Span, cx: &Context);
/// `on_end` method is invoked when a `Span` is ended.
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
/// already set). This method is called synchronously within the `Span::end`
/// API, therefore it should not block or throw an exception.
fn on_end(&self, span: SpanData);
/// Shutdown is invoked when SDK shuts down. Use this call to cleanup any
/// processor data. No calls to `on_start` and `on_end` method is invoked
/// after `shutdown` call is made.
/// Shuts down the processor. Called when SDK is shut down. This is an
/// opportunity for processor to do any cleanup required.
fn shutdown(&mut self);
}

Expand Down Expand Up @@ -95,12 +100,14 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
/// [`SpanProcessor`]: ../../api/trace/span_processor/trait.SpanProcessor.html
#[derive(Debug)]
pub struct SimpleSpanProcessor {
exporter: Box<dyn SpanExporter>,
exporter: Mutex<Box<dyn SpanExporter>>,
}

impl SimpleSpanProcessor {
pub(crate) fn new(exporter: Box<dyn SpanExporter>) -> Self {
SimpleSpanProcessor { exporter }
SimpleSpanProcessor {
exporter: Mutex::new(exporter),
}
}
}

Expand All @@ -110,12 +117,16 @@ impl SpanProcessor for SimpleSpanProcessor {
}

fn on_end(&self, span: SpanData) {
// TODO: Surface error through global error handler
let _result = executor::block_on(self.exporter.export(vec![span]));
if let Ok(mut exporter) = self.exporter.lock() {
// TODO: Surface error through global error handler
let _result = executor::block_on(exporter.export(vec![span]));
}
}

fn shutdown(&mut self) {
self.exporter.shutdown();
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown();
}
}
}

Expand Down Expand Up @@ -433,14 +444,31 @@ where

#[cfg(test)]
mod tests {
use crate::exporter::trace::stdout;
use crate::sdk::trace::span_processor::{
OTEL_BSP_MAX_EXPORT_BATCH_SIZE, OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT,
OTEL_BSP_SCHEDULE_DELAY_MILLIS, OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
use super::{
BatchSpanProcessor, SimpleSpanProcessor, SpanProcessor, OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE, OTEL_BSP_MAX_QUEUE_SIZE_DEFAULT, OTEL_BSP_SCHEDULE_DELAY_MILLIS,
OTEL_BSP_SCHEDULE_DELAY_MILLIS_DEFAULT,
};
use crate::sdk::trace::BatchSpanProcessor;
use crate::exporter::trace::stdout;
use crate::testing::trace::{new_test_export_span_data, new_test_exporter};
use std::time;

#[test]
fn simple_span_processor_on_end_calls_export() {
let (exporter, rx_export, _rx_shutdown) = new_test_exporter();
let processor = SimpleSpanProcessor::new(Box::new(exporter));
processor.on_end(new_test_export_span_data());
assert!(rx_export.try_recv().is_ok());
}

#[test]
fn simple_span_processor_shutdown_calls_shutdown() {
let (exporter, _rx_export, rx_shutdown) = new_test_exporter();
let mut processor = SimpleSpanProcessor::new(Box::new(exporter));
processor.shutdown();
assert!(rx_shutdown.try_recv().is_ok());
}

#[test]
fn test_build_batch_span_processor_from_env() {
std::env::set_var(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, "500");
Expand Down

0 comments on commit 5cf034c

Please sign in to comment.