-
Notifications
You must be signed in to change notification settings - Fork 586
/
lib.rs
87 lines (75 loc) · 2.51 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::time::Duration;
use actix::{Actor, Addr, Context, Handler, Message};
use awc::{Client, Connector};
use futures::FutureExt;
use near_performance_metrics_macros::perf;
use serde::{Deserialize, Serialize};
use tracing::info;
/// Timeout for establishing connection.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct TelemetryConfig {
pub endpoints: Vec<String>,
}
/// Event to send over telemetry.
#[derive(Message, Debug)]
#[rtype(result = "()")]
pub struct TelemetryEvent {
content: serde_json::Value,
}
pub struct TelemetryActor {
config: TelemetryConfig,
client: Client,
}
impl Default for TelemetryActor {
fn default() -> Self {
Self::new(TelemetryConfig::default())
}
}
impl TelemetryActor {
pub fn new(config: TelemetryConfig) -> Self {
for endpoint in config.endpoints.iter() {
if endpoint.is_empty() {
panic!(
"All telemetry endpoints must be valid URLs. Received: {:?}",
config.endpoints
);
}
}
let client = Client::builder()
.timeout(CONNECT_TIMEOUT)
.connector(
Connector::new()
.conn_lifetime(Duration::from_secs(u64::max_value()))
.conn_keep_alive(Duration::from_secs(30)),
)
.finish();
Self { config, client }
}
}
impl Actor for TelemetryActor {
type Context = Context<Self>;
}
impl Handler<TelemetryEvent> for TelemetryActor {
type Result = ();
#[perf]
fn handle(&mut self, msg: TelemetryEvent, _ctx: &mut Context<Self>) {
for endpoint in self.config.endpoints.iter() {
near_performance_metrics::actix::spawn("telemetry", file!(), line!(),
self.client
.post(endpoint)
.insert_header(("Content-Type", "application/json"))
.send_json(&msg.content)
.map(|response| {
if let Err(error) = response {
info!(target: "telemetry", "Telemetry data could not be sent due to: {}", error);
}
}),
);
}
}
}
/// Send telemetry event to all the endpoints.
pub fn telemetry(telemetry: &Addr<TelemetryActor>, content: serde_json::Value) {
telemetry.do_send(TelemetryEvent { content });
}