Skip to content

Commit

Permalink
cargo: Downgrade to tokio 0.2, switch to influxdb, make async
Browse files Browse the repository at this point in the history
Use asynchronous execution using tokio, switch to the influxdb crate,
which uses reqwests (in its asynchronous APIs).

Downgrade to tokio 0.2.x, because mixing 0.2.x and 0.3.x in the same
process does not work. See
seanmonstar/reqwest#1060.
  • Loading branch information
neverpanic committed Nov 7, 2020
1 parent 7f2e661 commit f70bacd
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 110 deletions.
158 changes: 149 additions & 9 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -9,9 +9,10 @@ async-trait = "^0.1.41"
chrono = "^0.4.19"
futures = "^0.3.7"
hidapi = "^1.2.3"
influx_db_client = "^0.4.4"
influxdb = "^0.2.0"
rand = "^0.7.3"
rust-crypto = "^0.2"
serde = "^1.0.117"
serde_derive = "^1.0.117"
tokio = { version = "^0.2.5", features = ["full"] }
toml = "^0.5.7"
115 changes: 66 additions & 49 deletions src/main.rs
@@ -1,7 +1,9 @@
#[macro_use]
extern crate serde_derive;
use futures::join;
use tokio::sync::mpsc::{channel, Sender, Receiver, error::TryRecvError};
use tokio::time::{interval, Duration};
use toml;
use futures::executor::block_on;

mod sensor;
mod sink;
Expand All @@ -13,9 +15,6 @@ use std::env;
use std::error;
use std::fs;
use std::process;
use std::sync::mpsc::{channel, TryRecvError};
use std::thread;
use std::time::{Duration, Instant};

#[derive(Deserialize)]
struct Config {
Expand All @@ -28,78 +27,96 @@ fn parse_config(filename: &str) -> Result<Config, Box<dyn error::Error>> {
Ok(config)
}

fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: {} config-file", args[0]);
process::exit(1);
}
async fn consumer(config_sinks: &Vec<SinkConfig>, mut rx: Receiver<sink::Measurement>) {
let mut sinks = sink::from_config(&config_sinks);
let mut interval = interval(Duration::from_secs(1));

let config = match parse_config(&args[1]) {
Ok(config) => config,
Err(err) => {
println!("Failed to parse configuration file {}: {}", args[1], err);
process::exit(1);
}
};
loop {
interval.tick().await;

let config_sinks = config.sink;
let (tx, rx) = channel::<sink::Measurement>();
let writer = thread::spawn(move || {
let mut sinks = sink::from_config(&config_sinks);
for sink in &mut sinks {
sink.submit().await;
}

loop {
thread::sleep(Duration::from_secs(1));
for sink in &mut sinks {
block_on(sink.submit())
}
match rx.try_recv() {
Ok(measurement) => {
for sink in &mut sinks {
sink.add_measurement(&measurement)
}
match rx.try_recv() {
Ok(measurement) => {
for sink in &mut sinks {
sink.add_measurement(&measurement).await;
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => continue,
}
Err(TryRecvError::Closed) => break,
Err(TryRecvError::Empty) => continue,
}
for sink in &mut sinks {
block_on(sink.submit())
}
});
}
for sink in &mut sinks {
sink.submit().await;
}
}

async fn producer(config_poll_interval: Option<u64>, mut tx: Sender<sink::Measurement>) {
let mut devices = match Sensor::sensors() {
None => vec![],
Some(devices) => devices,
};

let poll_interval = match config.poll_interval {
let poll_interval = match config_poll_interval {
Some(i) => Duration::from_secs(i),
None => Default::default(),
None => Duration::from_millis(1),
};

let mut interval = interval(poll_interval);

while devices.len() > 0 {
let begin_poll = Instant::now();
interval.tick().await;

let mut measurements = vec![];
devices.retain(|device| match device.read() {
None => false,
Some(measurement) => {
tx.send(measurement).unwrap();
measurements.push(measurement);
true
}
});

let elapsed = Instant::now() - begin_poll;
let sleepfor = poll_interval
.checked_sub(elapsed)
.unwrap_or_default();
thread::sleep(sleepfor);
for measurement in measurements {
tx.send(measurement).await.unwrap();
}
}

eprintln!("No devices left to query, exiting after all data has been written!");

drop(tx);
writer.join().unwrap();
}

#[tokio::main()]
async fn main() {
let args: Vec<String> = env::args().collect();
if args.len() < 2 {
println!("Usage: {} config-file", args[0]);
process::exit(1);
}

let config = match parse_config(&args[1]) {
Ok(config) => config,
Err(err) => {
println!("Failed to parse configuration file {}: {}", args[1], err);
process::exit(1);
}
};

let (tx, rx) = channel::<sink::Measurement>(50);

let poll_interval = config.poll_interval;
let consumer = tokio::spawn(async move {
consumer(&config.sink, rx).await;
});

let producer = tokio::spawn(async move {
producer(poll_interval, tx).await;
eprintln!("No devices left to query, exiting after all data has been written!");
});

let (producer_result, consumer_result) = join!(producer, consumer);
producer_result.unwrap();
consumer_result.unwrap();

process::exit(1);
}
115 changes: 68 additions & 47 deletions src/sink/influx.rs
@@ -1,10 +1,10 @@
use chrono;
use influx_db_client;
use async_trait::async_trait;
use chrono::{DateTime, Utc, Duration};
use influxdb::{Client, WriteQuery, Timestamp};

use self::chrono::{Utc, Duration};
use self::influx_db_client::{Client, Point, Points, Value, Precision, point};
use super::{Sink, Value as SinkValue};
use super::{Sink, Value};

use std::option::Option;


#[derive(Deserialize)]
Expand All @@ -19,79 +19,100 @@ pub struct InfluxConfig {
pub struct InfluxSink {
client: Client,
bulk_time: Duration,
points: Points,
queries: Vec<WriteQuery>,
oldest_data: Option<DateTime<Utc>>,
}

trait ToInflux {
fn to_influx(&self) -> Value;
trait AddToQuery {
fn add_query_field(&self, query: WriteQuery, key: &String) -> WriteQuery;
fn add_query_tag(&self, query: WriteQuery, key: &String) -> WriteQuery;
}

impl ToInflux for SinkValue {
fn to_influx(&self) -> Value {
impl AddToQuery for Value {
fn add_query_field(&self, query: WriteQuery, key: &String) -> WriteQuery {
match self {
Value::String(val) =>
query.add_field(key, val.to_owned()),
Value::Integer(val) =>
query.add_field(key, *val),
Value::Float(val) =>
query.add_field(key, *val),
}
}

fn add_query_tag(&self, query: WriteQuery, key: &String) -> WriteQuery {
match self {
SinkValue::String(val) => Value::String(val.to_owned()),
SinkValue::Integer(val) => Value::Integer(*val),
SinkValue::Float(val) => Value::Float(*val),
Value::String(val) =>
query.add_tag(key, val.to_owned()),
Value::Integer(val) =>
query.add_tag(key, *val),
Value::Float(val) =>
query.add_tag(key, *val),
}
}
}

#[async_trait]
impl Sink for InfluxSink {
fn add_measurement(&mut self, measurement: &super::Measurement) {
let mut point = point!(&measurement.measurement);
async fn add_measurement(&mut self, measurement: &super::Measurement) {
let mut query = WriteQuery::new(
Timestamp::Nanoseconds(measurement.timestamp.timestamp_nanos() as u128),
&measurement.measurement);

for (key, value) in &measurement.fields {
point = point.add_field(key, value.to_influx());
query = value.add_query_field(query, key);
}
for (key, value) in &measurement.tags {
point = point.add_tag(key, value.to_influx());
query = value.add_query_tag(query, key);
}
point = point.add_timestamp(measurement.timestamp.timestamp_nanos());

self.points.point.push(point.to_owned());
self.queries.push(query);
if self.oldest_data.is_none() {
self.oldest_data = Some(measurement.timestamp);
}
}

async fn submit(&mut self) -> () {
let mut submit = false;
if self.points.point.len() > 0 {
submit = match self.points.point[0].timestamp {
None => false,
Some(ts) => {
let expiry_boundary = ts + self.bulk_time.num_nanoseconds().unwrap_or(0);
let now = Utc::now().timestamp_nanos();
expiry_boundary < now
}
}
}
let submit = match self.oldest_data {
None => false,
Some(timestamp) => timestamp + self.bulk_time < Utc::now(),
};

if submit {
let num_points = self.points.point.len();
match self.client.write_points(
&mut self.points,
Some(Precision::Nanoseconds),
None,
).await {
Ok(_) => {
println!("----- {} submitted -----", num_points);
self.points = Points::create_new(Vec::new())
let mut successful = 0;
for query in &self.queries {
successful += match self.client.query(query).await {
Ok(_) => 1,
Err(err) => {
println!("Failed to execute query: {}", err);
0
}
}
Err(err) => println!("Failed to submit points: {}", err),
}

if successful > 0 {
println!("----- {} submitted -----", successful);
}

self.queries.clear();
self.oldest_data = None;
}
}
}

impl InfluxSink {
pub fn from_config(config: &InfluxConfig) -> Box<dyn Sink> {
let client = Client::new(
config.host.parse().unwrap(),
config.database.to_owned()
).set_authentication(config.user.to_owned(), config.pass.to_owned());
pub fn from_config(config: &InfluxConfig) -> Box<dyn Sink + Send> {
let client = Client::new(&config.host, &config.database)
.with_auth(&config.user, &config.pass);
let queries = vec![];
let bulk_time = Duration::seconds(config.bulk_time);
let points = Points::create_new(Vec::new());
let oldest_data: Option<DateTime<Utc>> = None;

Box::new(InfluxSink {
client,
bulk_time,
points,
queries,
oldest_data,
})
}
}
4 changes: 2 additions & 2 deletions src/sink/mod.rs
Expand Up @@ -20,7 +20,7 @@ pub enum SinkConfig {

#[async_trait]
pub trait Sink {
fn add_measurement(&mut self, _: &Measurement);
async fn add_measurement(&mut self, _: &Measurement);
async fn submit(&mut self);
}

Expand All @@ -41,7 +41,7 @@ impl fmt::Display for Value {
}
}

pub fn from_config(sink_configs: &Vec<SinkConfig>) -> Vec<Box<dyn Sink>> {
pub fn from_config(sink_configs: &Vec<SinkConfig>) -> Vec<Box<dyn Sink + Send>> {
let mut sinks = Vec::new();
for sink_config in sink_configs {
sinks.push(match sink_config {
Expand Down
4 changes: 2 additions & 2 deletions src/sink/print.rs
Expand Up @@ -12,7 +12,7 @@ pub struct PrintSink {

#[async_trait]
impl Sink for PrintSink {
fn add_measurement(&mut self, measurement: &super::Measurement) {
async fn add_measurement(&mut self, measurement: &super::Measurement) {
self.points.push(measurement.to_owned());
}

Expand All @@ -25,7 +25,7 @@ impl Sink for PrintSink {
}

impl PrintSink {
pub fn from_config(_config: &PrintConfig) -> Box<dyn Sink> {
pub fn from_config(_config: &PrintConfig) -> Box<dyn Sink + Send> {
Box::new(PrintSink { points: Vec::new() })
}
}

0 comments on commit f70bacd

Please sign in to comment.