-
Notifications
You must be signed in to change notification settings - Fork 2
/
mod.rs
115 lines (100 loc) · 2.85 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use chrono;
use async_trait::async_trait;
pub mod influx;
pub mod print;
use self::influx::{InfluxSink, InfluxConfig};
use self::print::{PrintSink, PrintConfig};
use self::chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fmt;
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum SinkConfig {
Influx(InfluxConfig),
Print(PrintConfig),
}
#[async_trait]
pub trait Sink {
async fn add_measurement(&mut self, _: &Measurement);
async fn submit(&mut self);
}
#[derive(Debug, Clone)]
pub enum Value {
String(String),
Integer(i64),
Float(f64),
}
impl fmt::Display for Value {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Value::String(val) => write!(f, "{}", val),
Value::Integer(val) => write!(f, "{}", val),
Value::Float(val) => write!(f, "{:0.4}", val),
}
}
}
pub fn from_config(sink_configs: &Vec<SinkConfig>) -> Vec<Box<dyn Sink + Send>> {
let mut sinks = Vec::new();
for sink_config in sink_configs {
sinks.push(match sink_config {
SinkConfig::Influx(val) => InfluxSink::from_config(val),
SinkConfig::Print(val) => PrintSink::from_config(val),
});
}
sinks
}
#[derive(Debug, Clone)]
pub struct Measurement {
pub measurement: String,
pub fields: HashMap<String, Value>,
pub tags: HashMap<String, Value>,
pub timestamp: DateTime<Utc>,
}
impl Measurement {
pub fn new(measurement: &str) -> Measurement {
Measurement {
measurement: String::from(measurement),
fields: HashMap::new(),
tags: HashMap::new(),
timestamp: Utc::now(),
}
}
pub fn field(mut self, name: &str, value: Value) -> Self {
self.fields.insert(String::from(name), value);
self
}
pub fn tag(mut self, name: &str, value: Value) -> Self {
self.tags.insert(String::from(name), value);
self
}
}
impl fmt::Display for Measurement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} {} ",
self.measurement,
self.timestamp.format("%Y-%m-%d %H:%M:%S%.f")
)?;
let mut tags: Vec<_> = self.tags.iter().collect();
tags.sort_by(|a, b| a.0.cmp(b.0));
for (idx, (key, value)) in tags.iter().enumerate() {
if idx != 0 {
write!(f, ",")?;
}
write!(f, "{}={}", key, value)?;
}
if tags.len() > 0 {
write!(f, " ")?;
}
let mut fields: Vec<_> = self.fields.iter().collect();
fields.sort_by(|a, b| a.0.cmp(b.0));
for (idx, (key, value)) in fields.iter().enumerate() {
if idx != 0 {
write!(f, ",")?;
}
write!(f, "{}={}", key, value)?;
}
Ok(())
}
}