Skip to content

Commit

Permalink
Controller: Include the object being reconciled in the error_policy (
Browse files Browse the repository at this point in the history
…#995)

Include the object being reconciled in the error policy

Signed-off-by: Felipe Sere <github@felipesere.com>

Signed-off-by: Felipe Sere <github@felipesere.com>
Co-authored-by: Eirik A <sszynrae@gmail.com>
  • Loading branch information
felipesere and clux committed Sep 1, 2022
1 parent a18fcb8 commit 57d97ce
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 13 deletions.
2 changes: 1 addition & 1 deletion examples/configmapgen_controller.rs
Expand Up @@ -67,7 +67,7 @@ async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Arc<Data>) -> Result
}

/// The controller triggers this on reconcile errors
fn error_policy(_error: &Error, _ctx: Arc<Data>) -> Action {
fn error_policy(_object: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<Data>) -> Action {
Action::requeue(Duration::from_secs(1))
}

Expand Down
2 changes: 1 addition & 1 deletion examples/crd_api.rs
Expand Up @@ -120,7 +120,7 @@ async fn main() -> Result<()> {

// Create Foo qux with status
info!("Create Foo instance qux");
let mut f2 = Foo::new("qux", FooSpec {
let f2 = Foo::new("qux", FooSpec {
name: "qux".into(),
replicas: 0,
info: "unpatched qux".into(),
Expand Down
2 changes: 1 addition & 1 deletion examples/secret_syncer.rs
Expand Up @@ -100,7 +100,7 @@ async fn main() -> anyhow::Result<()> {
.await
}
},
|_err, _| Action::requeue(Duration::from_secs(2)),
|_obj, _err, _| Action::requeue(Duration::from_secs(2)),
Arc::new(()),
)
.for_each(|msg| async move { info!("Reconciled: {:?}", msg) })
Expand Down
21 changes: 11 additions & 10 deletions kube-runtime/src/controller/mod.rs
Expand Up @@ -219,7 +219,7 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
Expand Down Expand Up @@ -274,13 +274,13 @@ where
object.reason = %request.reason
);
reconciler_span
.in_scope(|| reconciler(obj, context.clone()))
.in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
.into_future()
.then(move |res| {
let error_policy = error_policy;
RescheduleReconciliation::new(
res,
|err| error_policy(err, error_policy_ctx),
|err| error_policy(obj, err, error_policy_ctx),
request.obj_ref.clone(),
scheduler_tx,
)
Expand Down Expand Up @@ -421,8 +421,9 @@ where
/// // see configmapgen_controller example for full info
/// Ok(Action::requeue(Duration::from_secs(300)))
/// }
/// /// an error handler that will be called when the reconciler fails
/// fn error_policy(_error: &Error, _ctx: Arc<()>) -> Action {
/// /// an error handler that will be called when the reconciler fails with access to both the
/// /// object that caused the failure and the actual error
/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
/// Action::requeue(Duration::from_secs(60))
/// }
///
Expand Down Expand Up @@ -675,7 +676,7 @@ where
/// println!("Reconciling {}", o.name());
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
/// Arc::new(()),
/// );
/// # };
Expand Down Expand Up @@ -730,7 +731,7 @@ where
/// println!("Reconciling {}", o.name());
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// |_, err: &Infallible, _| Err(err).unwrap(),
/// Arc::new(()),
/// );
/// # };
Expand Down Expand Up @@ -808,7 +809,7 @@ where
pub fn run<ReconcilerFut, Ctx>(
self,
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(&ReconcilerFut::Error, Arc<Ctx>) -> Action,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
Expand Down Expand Up @@ -864,7 +865,7 @@ mod tests {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<Action>()) },
|_: &std::io::Error, _| mock_type::<Action>(),
|_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
Arc::new(()),
),
);
Expand All @@ -891,7 +892,7 @@ mod tests {
Ok(Action::requeue(Duration::ZERO))
})
},
|_: &Infallible, _| todo!(),
|_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Expand Down

0 comments on commit 57d97ce

Please sign in to comment.