From f70bacd0c6e903df6862255faae95636a90072c2 Mon Sep 17 00:00:00 2001 From: Clemens Lang Date: Sat, 7 Nov 2020 05:03:03 +0100 Subject: [PATCH] cargo: Downgrade to tokio 0.2, switch to influxdb, make async Use asynchronous execution using tokio, switch to the influxdb crate, which uses reqwests (in its asynchronous APIs). Downgrade to tokio 0.2.x, because mixing 0.2.x and 0.3.x in the same process does not work. See https://github.com/seanmonstar/reqwest/issues/1060. --- Cargo.lock | 158 ++++++++++++++++++++++++++++++++++++++++++--- Cargo.toml | 3 +- src/main.rs | 115 +++++++++++++++++++-------------- src/sink/influx.rs | 115 +++++++++++++++++++-------------- src/sink/mod.rs | 4 +- src/sink/print.rs | 4 +- 6 files changed, 289 insertions(+), 110 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba41769..c55f426 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,13 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "memchr 2.3.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "async-trait" version = "0.1.41" @@ -37,7 +45,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "cc" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -58,6 +66,7 @@ dependencies = [ "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", "num-integer 0.1.44 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.44 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -70,11 +79,12 @@ dependencies = [ "chrono 0.4.19 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "hidapi 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "influx_db_client 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "influxdb 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -276,12 +286,20 @@ name = "hashbrown" version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "hermit-abi" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hidapi" version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "cc 1.0.61 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.62 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", "pkg-config 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -370,15 +388,18 @@ dependencies = [ ] [[package]] -name = "influx_db_client" -version = "0.4.4" +name = "influxdb" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)", + "chrono 0.4.19 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "regex 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.10.8 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.59 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -476,6 +497,27 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mio-named-pipes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "miow" version = "0.2.1" @@ -487,6 +529,15 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "miow" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "socket2 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "native-tls" version = "0.2.5" @@ -531,6 +582,15 @@ dependencies = [ "autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "hermit-abi 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "once_cell" version = "1.4.1" @@ -560,7 +620,7 @@ version = "0.9.58" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "cc 1.0.61 (registry+https://github.com/rust-lang/crates.io-index)", + "cc 1.0.62 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", "pkg-config 0.3.19 (registry+https://github.com/rust-lang/crates.io-index)", "vcpkg 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", @@ -737,6 +797,22 @@ name = "redox_syscall" version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "regex" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "aho-corasick 0.7.15 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "regex-syntax 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "regex-syntax" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -871,6 +947,14 @@ dependencies = [ "url 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "signal-hook-registry" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "slab" version = "0.4.2" @@ -910,6 +994,32 @@ dependencies = [ "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thiserror" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "thiserror-impl 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.44" @@ -935,10 +1045,27 @@ dependencies = [ "futures-core 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.80 (registry+https://github.com/rust-lang/crates.io-index)", "memchr 2.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-named-pipes 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-uds 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)", + "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "pin-project-lite 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", + "signal-hook-registry 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "tokio-macros" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.24 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1196,13 +1323,14 @@ dependencies = [ ] [metadata] +"checksum aho-corasick 0.7.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" "checksum async-trait 0.1.41 (registry+https://github.com/rust-lang/crates.io-index)" = "b246867b8b3b6ae56035f1eb1ed557c1d8eae97f0d53696138a50fa0e3a3b8c0" "checksum autocfg 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" "checksum base64 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum bumpalo 3.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" "checksum bytes 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)" = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" -"checksum cc 1.0.61 (registry+https://github.com/rust-lang/crates.io-index)" = "ed67cbde08356238e75fc4656be4749481eeffb09e19f320a25237d5221c985d" +"checksum cc 1.0.62 (registry+https://github.com/rust-lang/crates.io-index)" = "f1770ced377336a88a67c473594ccc14eca6f4559217c34f64aac8f83d641b40" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum cfg-if 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" "checksum chrono 0.4.19 (registry+https://github.com/rust-lang/crates.io-index)" = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" @@ -1230,6 +1358,7 @@ dependencies = [ "checksum getrandom 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" "checksum h2 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)" = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" "checksum hashbrown 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +"checksum hermit-abi 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)" = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" "checksum hidapi 1.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "5c6ffb97f2ec5835ec73bcea5256fc2cd57a13c5958230778ef97f11900ba661" "checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" "checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" @@ -1239,7 +1368,7 @@ dependencies = [ "checksum hyper-tls 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" "checksum indexmap 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "55e2e4c765aa53a0424761bf9f41aa7a6ac1efa87238f59560640e27fca028f2" -"checksum influx_db_client 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "20f7ba194095b44d05d7d2907fb3a04e7ea7ae1591cdd41fe800e555e51903c4" +"checksum influxdb 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2a717cb443ed2232628e3f79cdeef84ec63e56df6e1732c676d4dba08c0dd2f7" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" "checksum ipnet 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135" "checksum itoa 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" @@ -1253,11 +1382,15 @@ dependencies = [ "checksum mime 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" "checksum mime_guess 2.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" "checksum mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)" = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" +"checksum mio-named-pipes 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +"checksum mio-uds 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum miow 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e" "checksum native-tls 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "1a1cda389c26d6b88f3d2dc38aa1b750fe87d298cc5d795ec9e975f402f00372" "checksum net2 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)" = "3ebc3ec692ed7c9a255596c67808dee269f64655d8baf7b4f0638e51ba1d6853" "checksum num-integer 0.1.44 (registry+https://github.com/rust-lang/crates.io-index)" = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" "checksum num-traits 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +"checksum num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" "checksum once_cell 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "260e51e7efe62b592207e9e13a68e43692a7a279171d6ba57abd208bf23645ad" "checksum openssl 0.10.30 (registry+https://github.com/rust-lang/crates.io-index)" = "8d575eff3665419f9b83678ff2815858ad9d11567e082f5ac1814baba4e2bcb4" "checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" @@ -1285,6 +1418,8 @@ dependencies = [ "checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" "checksum redox_syscall 0.1.57 (registry+https://github.com/rust-lang/crates.io-index)" = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +"checksum regex 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" +"checksum regex-syntax 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" "checksum remove_dir_all 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" "checksum reqwest 0.10.8 (registry+https://github.com/rust-lang/crates.io-index)" = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e" "checksum rust-crypto 0.2.36 (registry+https://github.com/rust-lang/crates.io-index)" = "f76d05d3993fd5f4af9434e8e436db163a12a9d40e1a58a726f27a01dfd12a2a" @@ -1297,13 +1432,18 @@ dependencies = [ "checksum serde_derive 1.0.117 (registry+https://github.com/rust-lang/crates.io-index)" = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" "checksum serde_json 1.0.59 (registry+https://github.com/rust-lang/crates.io-index)" = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" "checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +"checksum signal-hook-registry 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum socket2 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "b1fa70dc5c8104ec096f4fe7ede7a221d35ae13dcd19ba1ad9a81d2cab9a1c44" "checksum syn 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)" = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +"checksum thiserror 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)" = "0e9ae34b84616eedaaf1e9dd6026dbe00dcafa92aa0c8077cb69df1fcfe5e53e" +"checksum thiserror-impl 1.0.22 (registry+https://github.com/rust-lang/crates.io-index)" = "9ba20f23e85b10754cd195504aebf6a27e2e6cbe28c17778a0c930724628dd56" +"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum time 0.1.44 (registry+https://github.com/rust-lang/crates.io-index)" = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" "checksum tinyvec 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "238ce071d267c5710f9d31451efec16c5ee22de34df17cc05e56cbc92e967117" "checksum tokio 0.2.22 (registry+https://github.com/rust-lang/crates.io-index)" = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" +"checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" "checksum tokio-tls 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" "checksum tokio-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" "checksum toml 0.5.7 (registry+https://github.com/rust-lang/crates.io-index)" = "75cf45bb0bef80604d001caaec0d09da99611b3c0fd39d3080468875cdb65645" diff --git a/Cargo.toml b/Cargo.toml index 849b582..7353103 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,10 @@ async-trait = "^0.1.41" chrono = "^0.4.19" futures = "^0.3.7" hidapi = "^1.2.3" -influx_db_client = "^0.4.4" +influxdb = "^0.2.0" rand = "^0.7.3" rust-crypto = "^0.2" serde = "^1.0.117" serde_derive = "^1.0.117" +tokio = { version = "^0.2.5", features = ["full"] } toml = "^0.5.7" diff --git a/src/main.rs b/src/main.rs index e1cb47f..8ac98a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,9 @@ #[macro_use] extern crate serde_derive; +use futures::join; +use tokio::sync::mpsc::{channel, Sender, Receiver, error::TryRecvError}; +use tokio::time::{interval, Duration}; use toml; -use futures::executor::block_on; mod sensor; mod sink; @@ -13,9 +15,6 @@ use std::env; use std::error; use std::fs; use std::process; -use std::sync::mpsc::{channel, TryRecvError}; -use std::thread; -use std::time::{Duration, Instant}; #[derive(Deserialize)] struct Config { @@ -28,78 +27,96 @@ fn parse_config(filename: &str) -> Result> { Ok(config) } -fn main() { - let args: Vec = env::args().collect(); - if args.len() < 2 { - println!("Usage: {} config-file", args[0]); - process::exit(1); - } +async fn consumer(config_sinks: &Vec, mut rx: Receiver) { + let mut sinks = sink::from_config(&config_sinks); + let mut interval = interval(Duration::from_secs(1)); - let config = match parse_config(&args[1]) { - Ok(config) => config, - Err(err) => { - println!("Failed to parse configuration file {}: {}", args[1], err); - process::exit(1); - } - }; + loop { + interval.tick().await; - let config_sinks = config.sink; - let (tx, rx) = channel::(); - let writer = thread::spawn(move || { - let mut sinks = sink::from_config(&config_sinks); + for sink in &mut sinks { + sink.submit().await; + } - loop { - thread::sleep(Duration::from_secs(1)); - for sink in &mut sinks { - block_on(sink.submit()) - } - match rx.try_recv() { - Ok(measurement) => { - for sink in &mut sinks { - sink.add_measurement(&measurement) - } + match rx.try_recv() { + Ok(measurement) => { + for sink in &mut sinks { + sink.add_measurement(&measurement).await; } - Err(TryRecvError::Disconnected) => break, - Err(TryRecvError::Empty) => continue, } + Err(TryRecvError::Closed) => break, + Err(TryRecvError::Empty) => continue, } - for sink in &mut sinks { - block_on(sink.submit()) - } - }); + } + for sink in &mut sinks { + sink.submit().await; + } +} +async fn producer(config_poll_interval: Option, mut tx: Sender) { let mut devices = match Sensor::sensors() { None => vec![], Some(devices) => devices, }; - let poll_interval = match config.poll_interval { + let poll_interval = match config_poll_interval { Some(i) => Duration::from_secs(i), - None => Default::default(), + None => Duration::from_millis(1), }; + let mut interval = interval(poll_interval); + while devices.len() > 0 { - let begin_poll = Instant::now(); + interval.tick().await; + let mut measurements = vec![]; devices.retain(|device| match device.read() { None => false, Some(measurement) => { - tx.send(measurement).unwrap(); + measurements.push(measurement); true } }); - let elapsed = Instant::now() - begin_poll; - let sleepfor = poll_interval - .checked_sub(elapsed) - .unwrap_or_default(); - thread::sleep(sleepfor); + for measurement in measurements { + tx.send(measurement).await.unwrap(); + } } - eprintln!("No devices left to query, exiting after all data has been written!"); - drop(tx); - writer.join().unwrap(); +} + +#[tokio::main()] +async fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + println!("Usage: {} config-file", args[0]); + process::exit(1); + } + + let config = match parse_config(&args[1]) { + Ok(config) => config, + Err(err) => { + println!("Failed to parse configuration file {}: {}", args[1], err); + process::exit(1); + } + }; + + let (tx, rx) = channel::(50); + + let poll_interval = config.poll_interval; + let consumer = tokio::spawn(async move { + consumer(&config.sink, rx).await; + }); + + let producer = tokio::spawn(async move { + producer(poll_interval, tx).await; + eprintln!("No devices left to query, exiting after all data has been written!"); + }); + + let (producer_result, consumer_result) = join!(producer, consumer); + producer_result.unwrap(); + consumer_result.unwrap(); process::exit(1); } diff --git a/src/sink/influx.rs b/src/sink/influx.rs index 182f140..29154ed 100644 --- a/src/sink/influx.rs +++ b/src/sink/influx.rs @@ -1,10 +1,10 @@ -use chrono; -use influx_db_client; use async_trait::async_trait; +use chrono::{DateTime, Utc, Duration}; +use influxdb::{Client, WriteQuery, Timestamp}; -use self::chrono::{Utc, Duration}; -use self::influx_db_client::{Client, Point, Points, Value, Precision, point}; -use super::{Sink, Value as SinkValue}; +use super::{Sink, Value}; + +use std::option::Option; #[derive(Deserialize)] @@ -19,79 +19,100 @@ pub struct InfluxConfig { pub struct InfluxSink { client: Client, bulk_time: Duration, - points: Points, + queries: Vec, + oldest_data: Option>, } -trait ToInflux { - fn to_influx(&self) -> Value; +trait AddToQuery { + fn add_query_field(&self, query: WriteQuery, key: &String) -> WriteQuery; + fn add_query_tag(&self, query: WriteQuery, key: &String) -> WriteQuery; } -impl ToInflux for SinkValue { - fn to_influx(&self) -> Value { +impl AddToQuery for Value { + fn add_query_field(&self, query: WriteQuery, key: &String) -> WriteQuery { + match self { + Value::String(val) => + query.add_field(key, val.to_owned()), + Value::Integer(val) => + query.add_field(key, *val), + Value::Float(val) => + query.add_field(key, *val), + } + } + + fn add_query_tag(&self, query: WriteQuery, key: &String) -> WriteQuery { match self { - SinkValue::String(val) => Value::String(val.to_owned()), - SinkValue::Integer(val) => Value::Integer(*val), - SinkValue::Float(val) => Value::Float(*val), + Value::String(val) => + query.add_tag(key, val.to_owned()), + Value::Integer(val) => + query.add_tag(key, *val), + Value::Float(val) => + query.add_tag(key, *val), } } } #[async_trait] impl Sink for InfluxSink { - fn add_measurement(&mut self, measurement: &super::Measurement) { - let mut point = point!(&measurement.measurement); + async fn add_measurement(&mut self, measurement: &super::Measurement) { + let mut query = WriteQuery::new( + Timestamp::Nanoseconds(measurement.timestamp.timestamp_nanos() as u128), + &measurement.measurement); + for (key, value) in &measurement.fields { - point = point.add_field(key, value.to_influx()); + query = value.add_query_field(query, key); } for (key, value) in &measurement.tags { - point = point.add_tag(key, value.to_influx()); + query = value.add_query_tag(query, key); } - point = point.add_timestamp(measurement.timestamp.timestamp_nanos()); - self.points.point.push(point.to_owned()); + self.queries.push(query); + if self.oldest_data.is_none() { + self.oldest_data = Some(measurement.timestamp); + } } async fn submit(&mut self) -> () { - let mut submit = false; - if self.points.point.len() > 0 { - submit = match self.points.point[0].timestamp { - None => false, - Some(ts) => { - let expiry_boundary = ts + self.bulk_time.num_nanoseconds().unwrap_or(0); - let now = Utc::now().timestamp_nanos(); - expiry_boundary < now - } - } - } + let submit = match self.oldest_data { + None => false, + Some(timestamp) => timestamp + self.bulk_time < Utc::now(), + }; + if submit { - let num_points = self.points.point.len(); - match self.client.write_points( - &mut self.points, - Some(Precision::Nanoseconds), - None, - ).await { - Ok(_) => { - println!("----- {} submitted -----", num_points); - self.points = Points::create_new(Vec::new()) + let mut successful = 0; + for query in &self.queries { + successful += match self.client.query(query).await { + Ok(_) => 1, + Err(err) => { + println!("Failed to execute query: {}", err); + 0 + } } - Err(err) => println!("Failed to submit points: {}", err), } + + if successful > 0 { + println!("----- {} submitted -----", successful); + } + + self.queries.clear(); + self.oldest_data = None; } } } impl InfluxSink { - pub fn from_config(config: &InfluxConfig) -> Box { - let client = Client::new( - config.host.parse().unwrap(), - config.database.to_owned() - ).set_authentication(config.user.to_owned(), config.pass.to_owned()); + pub fn from_config(config: &InfluxConfig) -> Box { + let client = Client::new(&config.host, &config.database) + .with_auth(&config.user, &config.pass); + let queries = vec![]; let bulk_time = Duration::seconds(config.bulk_time); - let points = Points::create_new(Vec::new()); + let oldest_data: Option> = None; + Box::new(InfluxSink { client, bulk_time, - points, + queries, + oldest_data, }) } } diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 064fcbb..a6f6bed 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -20,7 +20,7 @@ pub enum SinkConfig { #[async_trait] pub trait Sink { - fn add_measurement(&mut self, _: &Measurement); + async fn add_measurement(&mut self, _: &Measurement); async fn submit(&mut self); } @@ -41,7 +41,7 @@ impl fmt::Display for Value { } } -pub fn from_config(sink_configs: &Vec) -> Vec> { +pub fn from_config(sink_configs: &Vec) -> Vec> { let mut sinks = Vec::new(); for sink_config in sink_configs { sinks.push(match sink_config { diff --git a/src/sink/print.rs b/src/sink/print.rs index bc5ecfc..2039296 100644 --- a/src/sink/print.rs +++ b/src/sink/print.rs @@ -12,7 +12,7 @@ pub struct PrintSink { #[async_trait] impl Sink for PrintSink { - fn add_measurement(&mut self, measurement: &super::Measurement) { + async fn add_measurement(&mut self, measurement: &super::Measurement) { self.points.push(measurement.to_owned()); } @@ -25,7 +25,7 @@ impl Sink for PrintSink { } impl PrintSink { - pub fn from_config(_config: &PrintConfig) -> Box { + pub fn from_config(_config: &PrintConfig) -> Box { Box::new(PrintSink { points: Vec::new() }) } }