-
Notifications
You must be signed in to change notification settings - Fork 384
/
main.rs
122 lines (102 loc) · 4.05 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use futures_util::{Stream, StreamExt as _};
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::metrics::{selectors, PushController};
use opentelemetry::trace::TraceError;
use opentelemetry::{
baggage::BaggageExt,
metrics::{self, ObserverResult},
trace::{TraceContextExt, Tracer},
Context, Key, KeyValue,
};
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry_otlp::{ExportConfig, WithExportConfig};
use std::error::Error;
use std::time::Duration;
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("http://localhost:4317"),
)
.install_batch(opentelemetry::runtime::Tokio)
}
// Skip first immediate tick from tokio, not needed for async_std.
fn delayed_interval(duration: Duration) -> impl Stream<Item = tokio::time::Instant> {
opentelemetry::util::tokio_interval_stream(duration).skip(1)
}
fn init_meter() -> metrics::Result<PushController> {
let export_config = ExportConfig {
endpoint: "http://localhost:4317".to_string(),
..ExportConfig::default()
};
opentelemetry_otlp::new_pipeline()
.metrics(tokio::spawn, delayed_interval)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(export_config),
)
.with_aggregator_selector(selectors::simple::Selector::Exact)
.build()
}
const FOO_KEY: Key = Key::from_static_str("ex.com/foo");
const BAR_KEY: Key = Key::from_static_str("ex.com/bar");
const LEMONS_KEY: Key = Key::from_static_str("lemons");
const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");
lazy_static::lazy_static! {
static ref COMMON_ATTRIBUTES: [KeyValue; 4] = [
LEMONS_KEY.i64(10),
KeyValue::new("A", "1"),
KeyValue::new("B", "2"),
KeyValue::new("C", "3"),
];
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// By binding the result to an unused variable, the lifetime of the variable
// matches the containing block, reporting traces and metrics during the whole
// execution.
let _ = init_tracer()?;
let _started = init_meter()?;
let tracer = global::tracer("ex.com/basic");
let meter = global::meter("ex.com/basic");
let one_metric_callback =
|res: ObserverResult<f64>| res.observe(1.0, COMMON_ATTRIBUTES.as_ref());
let _ = meter
.f64_value_observer("ex.com.one", one_metric_callback)
.with_description("A ValueObserver set to 1.0")
.init();
let histogram_two = meter.f64_histogram("ex.com.two").init();
let another_recorder = meter.f64_histogram("ex.com.two").init();
another_recorder.record(5.5, COMMON_ATTRIBUTES.as_ref());
let _baggage =
Context::current_with_baggage(vec![FOO_KEY.string("foo1"), BAR_KEY.string("bar1")])
.attach();
let histogram = histogram_two.bind(COMMON_ATTRIBUTES.as_ref());
tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![Key::new("bogons").i64(100)],
);
span.set_attribute(ANOTHER_KEY.string("yes"));
meter.record_batch_with_context(
// Note: call-site variables added as context Entries:
&Context::current_with_baggage(vec![ANOTHER_KEY.string("xyz")]),
COMMON_ATTRIBUTES.as_ref(),
vec![histogram_two.measurement(2.0)],
);
tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(LEMONS_KEY.string("five"));
span.add_event("Sub span event".to_string(), vec![]);
histogram.record(1.3);
});
});
// wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds.
tokio::time::sleep(Duration::from_secs(60)).await;
shutdown_tracer_provider();
Ok(())
}