From ee690983e29dd43a9b0f7bf6e074dbbcb14c9df1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Teo=20Klestrup=20R=C3=B6ijezon?= Date: Fri, 1 Jan 2021 22:05:28 +0100 Subject: [PATCH] Port kube-runtime to Tokio 1.0 See #339 --- examples/configmap_reflector.rs | 2 +- examples/crd_api.rs | 6 ++-- examples/crd_derive_no_schema.rs | 6 ++-- examples/crd_reflector.rs | 2 +- examples/deployment_reflector.rs | 2 +- examples/node_reflector.rs | 2 +- examples/secret_reflector.rs | 2 +- kube-runtime/Cargo.toml | 29 ++++++++++--------- .../src/controller/future_hash_map.rs | 2 +- kube-runtime/src/controller/runner.rs | 4 +-- kube-runtime/src/reflector/mod.rs | 4 +-- kube-runtime/src/reflector/store.rs | 2 +- kube-runtime/src/scheduler.rs | 19 +++++------- 13 files changed, 41 insertions(+), 41 deletions(-) diff --git a/examples/configmap_reflector.rs b/examples/configmap_reflector.rs index 862dbd316..7f059c469 100644 --- a/examples/configmap_reflector.rs +++ b/examples/configmap_reflector.rs @@ -11,7 +11,7 @@ fn spawn_periodic_reader(reader: Store) { tokio::spawn(async move { loop { // Periodically read our state - tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; let cms: Vec<_> = reader.state().iter().map(|obj| Meta::name(obj).clone()).collect(); info!("Current configmaps: {:?}", cms); } diff --git a/examples/crd_api.rs b/examples/crd_api.rs index 3e9cff8f2..7c49169d9 100644 --- a/examples/crd_api.rs +++ b/examples/crd_api.rs @@ -4,7 +4,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::json; use std::time::Duration; -use tokio::time::delay_for; +use tokio::time::sleep; use apiexts::CustomResourceDefinition; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1beta1 as apiexts; @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> { }) }); // Wait for the delete to take place (map-left case or delete from previous run) - delay_for(Duration::from_secs(2)).await; + sleep(Duration::from_secs(2)).await; // Create the CRD so we can create Foos in kube let foocrd = Foo::crd(); @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> { Err(e) => return Err(e.into()), // any other case is probably bad } // Wait for the api to catch up - delay_for(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; // Manage the Foo CR let foos: Api = Api::namespaced(client.clone(), &namespace); diff --git a/examples/crd_derive_no_schema.rs b/examples/crd_derive_no_schema.rs index af2156187..ec949d5e1 100644 --- a/examples/crd_derive_no_schema.rs +++ b/examples/crd_derive_no_schema.rs @@ -1,8 +1,9 @@ +#[cfg(not(feature = "schema"))] use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{ CustomResourceDefinition, CustomResourceValidation, JSONSchemaProps, }; -use kube_derive::CustomResource; -use serde::{Deserialize, Serialize}; +#[cfg(not(feature = "schema"))] use kube_derive::CustomResource; +#[cfg(not(feature = "schema"))] use serde::{Deserialize, Serialize}; /// CustomResource with manually implemented schema /// @@ -17,6 +18,7 @@ pub struct MyBar { bars: u32, } +#[cfg(not(feature = "schema"))] const MANUAL_SCHEMA: &'static str = r#" type: object properties: diff --git a/examples/crd_reflector.rs b/examples/crd_reflector.rs index 65e50dcb4..40c38c33a 100644 --- a/examples/crd_reflector.rs +++ b/examples/crd_reflector.rs @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { loop { // Periodically read our state - tokio::time::delay_for(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; let crds = reader.state().iter().map(Meta::name).collect::>(); info!("Current crds: {:?}", crds); } diff --git a/examples/deployment_reflector.rs b/examples/deployment_reflector.rs index 7c93c5ce8..98c289feb 100644 --- a/examples/deployment_reflector.rs +++ b/examples/deployment_reflector.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { // Periodically read our state let deploys: Vec<_> = reader.state().iter().map(Meta::name).collect(); info!("Current deploys: {:?}", deploys); - tokio::time::delay_for(std::time::Duration::from_secs(30)).await; + tokio::time::sleep(std::time::Duration::from_secs(30)).await; } }); diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 91ece8f30..e7e13686a 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -27,7 +27,7 @@ async fn main() -> anyhow::Result<()> { loop { let nodes = reader.state().iter().map(Meta::name).collect::>(); info!("Current {} nodes: {:?}", nodes.len(), nodes); - tokio::time::delay_for(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(std::time::Duration::from_secs(10)).await; } }); diff --git a/examples/secret_reflector.rs b/examples/secret_reflector.rs index c80fcdac6..d3baca487 100644 --- a/examples/secret_reflector.rs +++ b/examples/secret_reflector.rs @@ -42,7 +42,7 @@ fn spawn_periodic_reader(reader: Store) { .map(|s| format!("{}: {:?}", Meta::name(s), decode(s).keys())) .collect(); info!("Current secrets: {:?}", cms); - tokio::time::delay_for(std::time::Duration::from_secs(15)).await; + tokio::time::sleep(std::time::Duration::from_secs(15)).await; } }); } diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 308862836..65e3fc922 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -13,15 +13,16 @@ categories = ["web-programming::http-client"] edition = "2018" [dependencies] -futures = "0.3.5" +futures = "0.3.8" kube = { path = "../kube", version = "^0.45.0", default-features = false } derivative = "2.1.1" -serde = "1.0.115" -smallvec = "1.4.2" -pin-project = "0.4.23" -tokio = { version = "0.2.21", features = ["time"] } -snafu = { version = "0.6.8", features = ["futures"] } -dashmap = "3.11.10" +serde = "1.0.118" +smallvec = "1.6.0" +pin-project = "1.0.2" +tokio = { version = "1.0.1", features = ["time"] } +snafu = { version = "0.6.10", features = ["futures"] } +dashmap = "4.0.1" +tokio-util = { version = "0.6.0", features = ["time"] } [dependencies.k8s-openapi] version = "0.10.0" @@ -32,14 +33,14 @@ default = ["native-tls"] native-tls = ["kube/native-tls"] rustls-tls = ["kube/rustls-tls"] +[dev-dependencies] +kube-derive = { path = "../kube-derive", version = "^0.45.0"} +serde_json = "1.0.61" +tokio = { version = "1.0.1", features = ["full", "test-util"] } +rand = "0.8.0" +schemars = "0.8.0" + [dev-dependencies.k8s-openapi] version = "0.10.0" default-features = false features = ["v1_19"] - -[dev-dependencies] -kube-derive = { path = "../kube-derive", version = "^0.45.0"} -serde_json = "1.0.57" -tokio = { version = "0.2.22", features = ["full", "test-util"] } -rand = "0.7.3" -schemars = "0.8.0" diff --git a/kube-runtime/src/controller/future_hash_map.rs b/kube-runtime/src/controller/future_hash_map.rs index 7f2b35ef6..92dc7fced 100644 --- a/kube-runtime/src/controller/future_hash_map.rs +++ b/kube-runtime/src/controller/future_hash_map.rs @@ -85,7 +85,7 @@ mod tests { fhm.insert(i, future::ready(i)); } let mut values = fhm.collect::>().await; - values.sort(); + values.sort_unstable(); assert_eq!(values, (0..count).collect::>()); } diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index e20fef0bc..a67de8955 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -88,7 +88,7 @@ mod tests { use crate::scheduler::{scheduler, ScheduleRequest}; use futures::{channel::mpsc, poll, SinkExt, TryStreamExt}; use std::{cell::RefCell, time::Duration}; - use tokio::time::{delay_for, pause, Instant}; + use tokio::time::{pause, sleep, Instant}; #[tokio::test] async fn runner_should_never_run_two_instances_at_once() { @@ -102,7 +102,7 @@ mod tests { // Panic if this ref is already held, to simulate some unsafe action.. let mutex_ref = rc.borrow_mut(); Box::pin(async move { - delay_for(Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; drop(mutex_ref); }) }) diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 8fd07679c..a2ec05ff2 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -149,13 +149,13 @@ mod tests { #[tokio::test] async fn reflector_store_should_not_contain_duplicates() { let mut rng = rand::thread_rng(); - let item_dist = Uniform::new(0u8, 100); + let item_dist = Uniform::new(0_u8, 100); let deleted_dist = Bernoulli::new(0.40).unwrap(); let store_w = store::Writer::default(); let store = store_w.as_reader(); reflector( store_w, - stream::iter((0u32..100000).map(|gen| { + stream::iter((0_u32..100_000).map(|gen| { let item = rng.sample(item_dist); let deleted = rng.sample(deleted_dist); let obj = ConfigMap { diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 72a34c535..77e9a11f2 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -133,7 +133,7 @@ mod tests { let mut cluster_cm = cm.clone(); cluster_cm.metadata.namespace = None; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); + store_w.apply_watcher_event(&watcher::Event::Applied(cm)); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None); } diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 656f9a540..64a999e43 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -10,17 +10,14 @@ use std::{ pin::Pin, task::{Context, Poll}, }; -use tokio::time::{ - self, - delay_queue::{self, DelayQueue}, - Instant, -}; +use tokio::time::{self, Instant}; +use tokio_util::time::delay_queue::{self, DelayQueue}; #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("timer failure: {}", source))] TimerError { - source: time::Error, + source: time::error::Error, backtrace: Backtrace, }, } @@ -102,7 +99,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { &mut self, cx: &mut Context<'_>, can_take_message: impl Fn(&T) -> bool, - ) -> Poll>> { + ) -> Poll>> { if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() { return Poll::Ready(Some(Ok(self.pending.take(&msg).unwrap()))); } @@ -344,7 +341,7 @@ mod tests { ])); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -364,7 +361,7 @@ mod tests { ])); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); // Stream has terminated assert!(scheduler.next().await.is_none()); } @@ -383,7 +380,7 @@ mod tests { .unwrap(); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); schedule_tx .send(ScheduleRequest { @@ -394,7 +391,7 @@ mod tests { .unwrap(); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; - assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ()); + scheduler.next().now_or_never().unwrap().unwrap().unwrap(); assert!(poll!(scheduler.next()).is_pending()); } }