Skip to content

Commit

Permalink
Port kube-runtime to Tokio 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nightkr committed Jan 1, 2021
1 parent deff4b4 commit ee69098
Show file tree
Hide file tree
Showing 13 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion examples/configmap_reflector.rs
Expand Up @@ -11,7 +11,7 @@ fn spawn_periodic_reader(reader: Store<ConfigMap>) {
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let cms: Vec<_> = reader.state().iter().map(|obj| Meta::name(obj).clone()).collect();
info!("Current configmaps: {:?}", cms);
}
Expand Down
6 changes: 3 additions & 3 deletions examples/crd_api.rs
Expand Up @@ -4,7 +4,7 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::time::Duration;
use tokio::time::delay_for;
use tokio::time::sleep;

use apiexts::CustomResourceDefinition;
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1beta1 as apiexts;
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> {
})
});
// Wait for the delete to take place (map-left case or delete from previous run)
delay_for(Duration::from_secs(2)).await;
sleep(Duration::from_secs(2)).await;

// Create the CRD so we can create Foos in kube
let foocrd = Foo::crd();
Expand All @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
Err(e) => return Err(e.into()), // any other case is probably bad
}
// Wait for the api to catch up
delay_for(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;

// Manage the Foo CR
let foos: Api<Foo> = Api::namespaced(client.clone(), &namespace);
Expand Down
6 changes: 4 additions & 2 deletions examples/crd_derive_no_schema.rs
@@ -1,8 +1,9 @@
#[cfg(not(feature = "schema"))]
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::{
CustomResourceDefinition, CustomResourceValidation, JSONSchemaProps,
};
use kube_derive::CustomResource;
use serde::{Deserialize, Serialize};
#[cfg(not(feature = "schema"))] use kube_derive::CustomResource;
#[cfg(not(feature = "schema"))] use serde::{Deserialize, Serialize};

/// CustomResource with manually implemented schema
///
Expand All @@ -17,6 +18,7 @@ pub struct MyBar {
bars: u32,
}

#[cfg(not(feature = "schema"))]
const MANUAL_SCHEMA: &'static str = r#"
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion examples/crd_reflector.rs
Expand Up @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let crds = reader.state().iter().map(Meta::name).collect::<Vec<_>>();
info!("Current crds: {:?}", crds);
}
Expand Down
2 changes: 1 addition & 1 deletion examples/deployment_reflector.rs
Expand Up @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
// Periodically read our state
let deploys: Vec<_> = reader.state().iter().map(Meta::name).collect();
info!("Current deploys: {:?}", deploys);
tokio::time::delay_for(std::time::Duration::from_secs(30)).await;
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
}
});

Expand Down
2 changes: 1 addition & 1 deletion examples/node_reflector.rs
Expand Up @@ -27,7 +27,7 @@ async fn main() -> anyhow::Result<()> {
loop {
let nodes = reader.state().iter().map(Meta::name).collect::<Vec<_>>();
info!("Current {} nodes: {:?}", nodes.len(), nodes);
tokio::time::delay_for(std::time::Duration::from_secs(10)).await;
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});

Expand Down
2 changes: 1 addition & 1 deletion examples/secret_reflector.rs
Expand Up @@ -42,7 +42,7 @@ fn spawn_periodic_reader(reader: Store<Secret>) {
.map(|s| format!("{}: {:?}", Meta::name(s), decode(s).keys()))
.collect();
info!("Current secrets: {:?}", cms);
tokio::time::delay_for(std::time::Duration::from_secs(15)).await;
tokio::time::sleep(std::time::Duration::from_secs(15)).await;
}
});
}
Expand Down
29 changes: 15 additions & 14 deletions kube-runtime/Cargo.toml
Expand Up @@ -13,15 +13,16 @@ categories = ["web-programming::http-client"]
edition = "2018"

[dependencies]
futures = "0.3.5"
futures = "0.3.8"
kube = { path = "../kube", version = "^0.45.0", default-features = false }
derivative = "2.1.1"
serde = "1.0.115"
smallvec = "1.4.2"
pin-project = "0.4.23"
tokio = { version = "0.2.21", features = ["time"] }
snafu = { version = "0.6.8", features = ["futures"] }
dashmap = "3.11.10"
serde = "1.0.118"
smallvec = "1.6.0"
pin-project = "1.0.2"
tokio = { version = "1.0.1", features = ["time"] }
snafu = { version = "0.6.10", features = ["futures"] }
dashmap = "4.0.1"
tokio-util = { version = "0.6.0", features = ["time"] }

[dependencies.k8s-openapi]
version = "0.10.0"
Expand All @@ -32,14 +33,14 @@ default = ["native-tls"]
native-tls = ["kube/native-tls"]
rustls-tls = ["kube/rustls-tls"]

[dev-dependencies]
kube-derive = { path = "../kube-derive", version = "^0.45.0"}
serde_json = "1.0.61"
tokio = { version = "1.0.1", features = ["full", "test-util"] }
rand = "0.8.0"
schemars = "0.8.0"

[dev-dependencies.k8s-openapi]
version = "0.10.0"
default-features = false
features = ["v1_19"]

[dev-dependencies]
kube-derive = { path = "../kube-derive", version = "^0.45.0"}
serde_json = "1.0.57"
tokio = { version = "0.2.22", features = ["full", "test-util"] }
rand = "0.7.3"
schemars = "0.8.0"
2 changes: 1 addition & 1 deletion kube-runtime/src/controller/future_hash_map.rs
Expand Up @@ -85,7 +85,7 @@ mod tests {
fhm.insert(i, future::ready(i));
}
let mut values = fhm.collect::<Vec<u16>>().await;
values.sort();
values.sort_unstable();
assert_eq!(values, (0..count).collect::<Vec<u16>>());
}

Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/runner.rs
Expand Up @@ -88,7 +88,7 @@ mod tests {
use crate::scheduler::{scheduler, ScheduleRequest};
use futures::{channel::mpsc, poll, SinkExt, TryStreamExt};
use std::{cell::RefCell, time::Duration};
use tokio::time::{delay_for, pause, Instant};
use tokio::time::{pause, sleep, Instant};

#[tokio::test]
async fn runner_should_never_run_two_instances_at_once() {
Expand All @@ -102,7 +102,7 @@ mod tests {
// Panic if this ref is already held, to simulate some unsafe action..
let mutex_ref = rc.borrow_mut();
Box::pin(async move {
delay_for(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;
drop(mutex_ref);
})
})
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/reflector/mod.rs
Expand Up @@ -149,13 +149,13 @@ mod tests {
#[tokio::test]
async fn reflector_store_should_not_contain_duplicates() {
let mut rng = rand::thread_rng();
let item_dist = Uniform::new(0u8, 100);
let item_dist = Uniform::new(0_u8, 100);
let deleted_dist = Bernoulli::new(0.40).unwrap();
let store_w = store::Writer::default();
let store = store_w.as_reader();
reflector(
store_w,
stream::iter((0u32..100000).map(|gen| {
stream::iter((0_u32..100_000).map(|gen| {
let item = rng.sample(item_dist);
let deleted = rng.sample(deleted_dist);
let obj = ConfigMap {
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/reflector/store.rs
Expand Up @@ -133,7 +133,7 @@ mod tests {
let mut cluster_cm = cm.clone();
cluster_cm.metadata.namespace = None;
let mut store_w = Writer::default();
store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone()));
store_w.apply_watcher_event(&watcher::Event::Applied(cm));
let store = store_w.as_reader();
assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None);
}
Expand Down
19 changes: 8 additions & 11 deletions kube-runtime/src/scheduler.rs
Expand Up @@ -10,17 +10,14 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::time::{
self,
delay_queue::{self, DelayQueue},
Instant,
};
use tokio::time::{self, Instant};
use tokio_util::time::delay_queue::{self, DelayQueue};

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("timer failure: {}", source))]
TimerError {
source: time::Error,
source: time::error::Error,
backtrace: Backtrace,
},
}
Expand Down Expand Up @@ -102,7 +99,7 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
&mut self,
cx: &mut Context<'_>,
can_take_message: impl Fn(&T) -> bool,
) -> Poll<Option<Result<T, time::Error>>> {
) -> Poll<Option<Result<T, time::error::Error>>> {
if let Some(msg) = self.pending.iter().find(|msg| can_take_message(*msg)).cloned() {
return Poll::Ready(Some(Ok(self.pending.take(&msg).unwrap())));
}
Expand Down Expand Up @@ -344,7 +341,7 @@ mod tests {
]));
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ());
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -364,7 +361,7 @@ mod tests {
]));
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ());
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
// Stream has terminated
assert!(scheduler.next().await.is_none());
}
Expand All @@ -383,7 +380,7 @@ mod tests {
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ());
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
schedule_tx
.send(ScheduleRequest {
Expand All @@ -394,7 +391,7 @@ mod tests {
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().unwrap(), ());
scheduler.next().now_or_never().unwrap().unwrap().unwrap();
assert!(poll!(scheduler.next()).is_pending());
}
}

0 comments on commit ee69098

Please sign in to comment.