diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index a17d7a756a64f..e95f1f80b1593 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -652,6 +652,7 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance for pod in pod_list.items: self.adopt_launched_task(kube_client, pod, pod_ids) self._adopt_completed_pods(kube_client) + self._handle_zombied_istio_pods(kube_client) tis_to_flush.extend(pod_ids.values()) return tis_to_flush @@ -711,6 +712,22 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + def _handle_zombied_istio_pods(self, kube_client: client.CoreV1Api) -> None: + """ + Handle Zombied pods that are caused because istio container is still running, + while base container (where Airflow task is run) is completed. + + :param kube_client: kubernetes client for speaking to kube API + """ + kwargs = { + 'field_selector': "status.phase=Running", + 'label_selector': 'kubernetes_executor=True', + } + pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs) + istio = Istio(kube_client=self.kube_client) + for pod in pod_list.items: + istio.handle_istio_proxy(pod) + def _flush_task_queue(self) -> None: if not self.task_queue: raise AirflowException(NOT_STARTED_MESSAGE) diff --git a/airflow/kubernetes/istio.py b/airflow/kubernetes/istio.py index 3a6875cef387f..766fe0d1c722d 100644 --- a/airflow/kubernetes/istio.py +++ b/airflow/kubernetes/istio.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from kubernetes.client.rest import ApiException from kubernetes.stream import stream from packaging.version import parse as semantic_version @@ -54,7 +55,10 @@ def handle_istio_proxy(self, pod) -> bool: "pod name: %s", pod.metadata.name, ) - self._shutdown_istio_proxy(pod) + try: + self._shutdown_istio_proxy(pod) + except ApiException: + self.log.debug("Error handling Istio container for pod: %s", pod.metadata.name) return True return False