Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp committed Nov 19, 2020
1 parent 992abb8 commit ee26cd0
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 7 deletions.
2 changes: 1 addition & 1 deletion opentelemetry/Cargo.toml
Expand Up @@ -26,7 +26,7 @@ async-trait = { version = "0.1", optional = true }
bincode = { version = "1.2", optional = true }
dashmap = { version = "4.0.0-rc6", optional = true }
fnv = { version = "1.0", optional = true }
futures = { version = "0.3", features = ["std"] }
futures = "0.3"
lazy_static = "1.4"
percent-encoding = { version = "2.0", optional = true }
pin-project = { version = "0.4", optional = true }
Expand Down
14 changes: 8 additions & 6 deletions opentelemetry/src/sdk/trace/span_processor.rs
Expand Up @@ -44,7 +44,7 @@ use futures::{
channel::mpsc, channel::oneshot, executor, future::BoxFuture, future::Either, pin_mut, Future,
FutureExt, Stream, StreamExt,
};
use std::{fmt, pin::Pin, str::FromStr, sync::Mutex, time};
use std::{fmt, str::FromStr, sync::Mutex, time};

/// Delay interval between two consecutive exports, default to be 5000.
const OTEL_BSP_SCHEDULE_DELAY_MILLIS: &str = "OTEL_BSP_SCHEDULE_DELAY_MILLIS";
Expand Down Expand Up @@ -140,7 +140,11 @@ impl SpanProcessor for SimpleSpanProcessor {
exporter.shutdown();
Ok(())
} else {
Err(TraceError::Other("When force flushing the SimpleSpanProcessor, the exporter's lock has been poisoned".into()).into())
Err(TraceError::Other(
"When shutting down the SimpleSpanProcessor, the exporter's lock has been poisoned"
.into(),
)
.into())
}
}
}
Expand Down Expand Up @@ -189,7 +193,6 @@ impl SpanProcessor for SimpleSpanProcessor {
/// [`async-std`]: https://async.rs
pub struct BatchSpanProcessor {
message_sender: Mutex<mpsc::Sender<BatchMessage>>,
_worker_handle: Option<Pin<Box<dyn Future<Output = ()> + Send + Sync>>>,
}

impl fmt::Debug for BatchSpanProcessor {
Expand Down Expand Up @@ -224,7 +227,7 @@ impl SpanProcessor for BatchSpanProcessor {
}

fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When force flushing the BatchSpanProcessor, the message sender's lock has been poisoned".into()))?;
let mut sender = self.message_sender.lock().map_err(|_| TraceError::Other("When shutting down the BatchSpanProcessor, the message sender's lock has been poisoned".into()))?;
let (res_sender, res_receiver) = oneshot::channel::<Vec<ExportResult>>();
sender.try_send(BatchMessage::Shutdown(res_sender))?;
for result in futures::executor::block_on(res_receiver)? {
Expand Down Expand Up @@ -263,7 +266,7 @@ impl BatchSpanProcessor {
let ticker = interval(config.scheduled_delay).map(|_| BatchMessage::Flush(None));

// Spawn worker process via user-defined spawn function.
let worker_handle = spawn(Box::pin(async move {
let _worker_handle = spawn(Box::pin(async move {
let mut spans = Vec::new();
let mut messages = Box::pin(futures::stream::select(message_receiver, ticker));

Expand Down Expand Up @@ -344,7 +347,6 @@ impl BatchSpanProcessor {
// Return batch processor with link to worker
BatchSpanProcessor {
message_sender: Mutex::new(message_sender),
_worker_handle: Some(Box::pin(worker_handle)),
}
}

Expand Down

0 comments on commit ee26cd0

Please sign in to comment.