From 12349a1003f913f744cb863b3bd4a40cb2f54c07 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Tue, 13 Jul 2021 00:55:40 +0100 Subject: [PATCH] [astro] Handle Istio containers with Kubernetes Executor Pod adoption (#1318) closes https://github.com/astronomer/issues/issues/3030 >This edge case deals specifically with task instances that ended in the UP_FOR_RETRY state when a scheduler is adopting orphaned task. Generally, this issue does not affec OSS Airflow since the template kubernetes worker pods spawned doesn't have additional containers that would prevent the pod from going into the Succeeded pod state. Those pods in the Succeeded state are handled by the scheduler's adoption process in _adopt_completed_pods(). Since Astronomer's kubernetes worker pods have an additional container (istio-proxy), they are in the NotReady state when tasks are not killed and they are not eligible for adoption. This can also happen for "completed" pods that have sidecars. Same process though, just a slightly different scenario: If a worker finishes while not being watched by a scheduler, it never gets adopted by another scheduler in _adopt_completed_pods() as the pod is still 'Running', but the TI also isn't in a resettable state so scheduler_job never asks the executor to adopt it! It's in limbo - "complete" in Airflows view (based on TI state) but "Running" in k8s view (since the sidecar is still running). This commit re-uses current Istio code and handles those pods. (cherry picked from commit 3f309b057a9cfb59293c130dedff400e3e1a9a52) --- airflow/executors/kubernetes_executor.py | 17 +++++++++++++++++ airflow/kubernetes/istio.py | 6 +++++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index a17d7a756a64f..e95f1f80b1593 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -652,6 +652,7 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance for pod in pod_list.items: self.adopt_launched_task(kube_client, pod, pod_ids) self._adopt_completed_pods(kube_client) + self._handle_zombied_istio_pods(kube_client) tis_to_flush.extend(pod_ids.values()) return tis_to_flush @@ -711,6 +712,22 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + def _handle_zombied_istio_pods(self, kube_client: client.CoreV1Api) -> None: + """ + Handle Zombied pods that are caused because istio container is still running, + while base container (where Airflow task is run) is completed. + + :param kube_client: kubernetes client for speaking to kube API + """ + kwargs = { + 'field_selector': "status.phase=Running", + 'label_selector': 'kubernetes_executor=True', + } + pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs) + istio = Istio(kube_client=self.kube_client) + for pod in pod_list.items: + istio.handle_istio_proxy(pod) + def _flush_task_queue(self) -> None: if not self.task_queue: raise AirflowException(NOT_STARTED_MESSAGE) diff --git a/airflow/kubernetes/istio.py b/airflow/kubernetes/istio.py index 3a6875cef387f..766fe0d1c722d 100644 --- a/airflow/kubernetes/istio.py +++ b/airflow/kubernetes/istio.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from kubernetes.client.rest import ApiException from kubernetes.stream import stream from packaging.version import parse as semantic_version @@ -54,7 +55,10 @@ def handle_istio_proxy(self, pod) -> bool: "pod name: %s", pod.metadata.name, ) - self._shutdown_istio_proxy(pod) + try: + self._shutdown_istio_proxy(pod) + except ApiException: + self.log.debug("Error handling Istio container for pod: %s", pod.metadata.name) return True return False