Skip to content

Commit

Permalink
Don't re-patch pods that are already controlled by current worker (#2…
Browse files Browse the repository at this point in the history
…6778)

After the scheduler has launched many pods, it keeps trying to
re-adopt them by patching every pod. Each patch-operation
involves a remote API-call which can be be very slow.
In the meantime the scheduler can not do anything else.

By ignoring the pods that already have the expected label,
the list query-result will be shorter and the number of
patch-queries much less.

We had an unlucky moment in our environment, where each
patch-operation started taking 100ms each, with 200 pods in
flight it accumulates into 20 seconds of blocked scheduler.
  • Loading branch information
hterik committed Oct 18, 2022
1 parent 8efb678 commit 27ec562
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions airflow/executors/kubernetes_executor.py
Expand Up @@ -759,14 +759,15 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
if not self.scheduler_job_id:
raise AirflowException(NOT_STARTED_MESSAGE)
new_worker_id_label = pod_generator.make_safe_label_value(self.scheduler_job_id)
kwargs = {
'field_selector': "status.phase=Succeeded",
'label_selector': 'kubernetes_executor=True',
'label_selector': f'kubernetes_executor=True,airflow-worker!={new_worker_id_label}',
}
pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
for pod in pod_list.items:
self.log.info("Attempting to adopt pod %s", pod.metadata.name)
pod.metadata.labels['airflow-worker'] = pod_generator.make_safe_label_value(self.scheduler_job_id)
pod.metadata.labels['airflow-worker'] = new_worker_id_label
try:
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
Expand Down

0 comments on commit 27ec562

Please sign in to comment.