Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement timeout for force_flush and shutdown. #362

Merged
merged 11 commits into from Nov 20, 2020
9 changes: 5 additions & 4 deletions opentelemetry/Cargo.toml
Expand Up @@ -21,17 +21,17 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
async-std = { version = "1.6", features = ["unstable"], default-features = false, optional = true }
async-std = { version = "1.6", features = ["unstable"], default-features = false, optional = true }
async-trait = { version = "0.1", optional = true }
bincode = { version = "1.2", optional = true }
dashmap = { version = "4.0.0-rc6", optional = true }
fnv = { version = "1.0", optional = true }
futures = "0.3"
futures = { version = "0.3", features = ["std"] }
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
lazy_static = "1.4"
percent-encoding = { version = "2.0", optional = true }
pin-project = { version = "0.4", optional = true }
rand = { version = "0.7", default-features = false, features = ["std"], optional = true }
regex = { version = "1.3", default-features = false, features = ["std", "perf"], optional = true}
regex = { version = "1.3", default-features = false, features = ["std", "perf"], optional = true }
serde = { version = "1.0", features = ["derive", "rc"], optional = true }
http = { version = "0.2", optional = true }
thiserror = { version = "1.0", optional = true }
Expand All @@ -44,10 +44,11 @@ surf = { version = "2.0", default-features = false, optional = true }
criterion = "0.3.1"
rand_distr = "0.3.0"
tokio = { version = "0.2", features = ["full"] }
async-std = { version = "1.6", features = ["unstable"] }

[features]
default = ["trace"]
trace = ["rand", "pin-project", "async-trait", "regex", "percent-encoding"]
trace = ["rand", "pin-project", "async-trait", "regex", "percent-encoding", "thiserror"]
metrics = ["thiserror", "dashmap", "fnv"]
serialize = ["serde", "bincode"]

Expand Down
11 changes: 11 additions & 0 deletions opentelemetry/src/api/trace/mod.rs
Expand Up @@ -136,3 +136,14 @@ pub use self::{
},
tracer::{SpanBuilder, Tracer},
};

use thiserror::Error;
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved

/// Errors returned by the trace API.
#[derive(Error, Debug, PartialEq)]
#[non_exhaustive]
pub enum TraceError {
TommyCpp marked this conversation as resolved.
Show resolved Hide resolved
/// Other errors not covered by specific cases.
#[error("Trace error: {0}")]
Other(String),
}
15 changes: 14 additions & 1 deletion opentelemetry/src/exporter/trace/mod.rs
Expand Up @@ -10,7 +10,8 @@ use http::Request;
use serde::{Deserialize, Serialize};
#[cfg(all(feature = "http", feature = "reqwest"))]
use std::convert::TryInto;
use std::fmt::Debug;
use std::error::Error;
use std::fmt::{Debug, Display};
use std::sync::Arc;
use std::time::SystemTime;

Expand All @@ -19,6 +20,18 @@ pub mod stdout;
/// Describes the result of an export.
pub type ExportResult = Result<(), Box<dyn std::error::Error + Send + Sync + 'static>>;

/// Timed out when exporting spans to remote
#[derive(Debug, Default)]
pub struct ExportTimedOutError {}

impl Display for ExportTimedOutError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("export timed out")
}
}

impl Error for ExportTimedOutError {}

/// `SpanExporter` defines the interface that protocol-specific exporters must
/// implement so that they can be plugged into OpenTelemetry SDK and support
/// sending of telemetry data.
Expand Down
10 changes: 8 additions & 2 deletions opentelemetry/src/sdk/trace/provider.rs
Expand Up @@ -25,7 +25,7 @@ pub(crate) struct TracerProviderInner {
impl Drop for TracerProviderInner {
fn drop(&mut self) {
for processor in &mut self.processors {
processor.shutdown();
let _result = processor.shutdown();
}
}
}
Expand Down Expand Up @@ -116,7 +116,12 @@ impl Builder {
// drop. We cannot assume we are in a multi-threaded tokio runtime here, so use
// `spawn_blocking` to avoid blocking the main thread.
let spawn = |fut| tokio::task::spawn_blocking(|| futures::executor::block_on(fut));
let batch = sdk::trace::BatchSpanProcessor::builder(exporter, spawn, tokio::time::interval);
let batch = sdk::trace::BatchSpanProcessor::builder(
exporter,
spawn,
tokio::time::delay_for,
tokio::time::interval,
);
self.with_batch_exporter(batch.build())
}

Expand All @@ -127,6 +132,7 @@ impl Builder {
let batch = sdk::trace::BatchSpanProcessor::builder(
exporter,
async_std::task::spawn,
async_std::task::sleep,
async_std::stream::interval,
);
self.with_batch_exporter(batch.build())
Expand Down