Skip to content

Commit

Permalink
[astro] Handle Istio containers with Kubernetes Executor Pod adoption (
Browse files Browse the repository at this point in the history
…#1318)

closes astronomer/issues#3030

>This edge case deals specifically with task instances that ended in the UP_FOR_RETRY state when a scheduler is adopting orphaned task. Generally, this issue does not affec OSS Airflow since the template kubernetes worker pods spawned doesn't have additional containers that would prevent the pod from going into the Succeeded pod state. Those pods in the Succeeded state are handled by the scheduler's adoption process in _adopt_completed_pods().

Since Astronomer's kubernetes worker pods have an additional container (istio-proxy), they are in the NotReady state when tasks are not killed and they are not eligible for adoption.

This can also happen for "completed" pods that have sidecars. Same process though, just a slightly different scenario: If a worker finishes while not being watched by a scheduler, it never gets adopted by another scheduler in _adopt_completed_pods() as the pod is still 'Running', but the TI also isn't in a resettable state so scheduler_job never asks the executor to adopt it! It's in limbo - "complete" in Airflows view (based on TI state) but "Running" in k8s view (since the sidecar is still running).

This commit re-uses current Istio code and handles those pods.

(cherry picked from commit 3f309b0)
  • Loading branch information
kaxil committed Jul 13, 2021
1 parent 74fa132 commit 12349a1
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
17 changes: 17 additions & 0 deletions airflow/executors/kubernetes_executor.py
Expand Up @@ -652,6 +652,7 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance
for pod in pod_list.items:
self.adopt_launched_task(kube_client, pod, pod_ids)
self._adopt_completed_pods(kube_client)
self._handle_zombied_istio_pods(kube_client)
tis_to_flush.extend(pod_ids.values())
return tis_to_flush

Expand Down Expand Up @@ -711,6 +712,22 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e)

def _handle_zombied_istio_pods(self, kube_client: client.CoreV1Api) -> None:
"""
Handle Zombied pods that are caused because istio container is still running,
while base container (where Airflow task is run) is completed.
:param kube_client: kubernetes client for speaking to kube API
"""
kwargs = {
'field_selector': "status.phase=Running",
'label_selector': 'kubernetes_executor=True',
}
pod_list = kube_client.list_namespaced_pod(namespace=self.kube_config.kube_namespace, **kwargs)
istio = Istio(kube_client=self.kube_client)
for pod in pod_list.items:
istio.handle_istio_proxy(pod)

def _flush_task_queue(self) -> None:
if not self.task_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
Expand Down
6 changes: 5 additions & 1 deletion airflow/kubernetes/istio.py
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

from kubernetes.client.rest import ApiException
from kubernetes.stream import stream
from packaging.version import parse as semantic_version

Expand Down Expand Up @@ -54,7 +55,10 @@ def handle_istio_proxy(self, pod) -> bool:
"pod name: %s",
pod.metadata.name,
)
self._shutdown_istio_proxy(pod)
try:
self._shutdown_istio_proxy(pod)
except ApiException:
self.log.debug("Error handling Istio container for pod: %s", pod.metadata.name)
return True
return False

Expand Down

0 comments on commit 12349a1

Please sign in to comment.