Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesPodOperator callback example from Doc doesn't work #39291

Open
1 of 2 tasks
owler opened this issue Apr 27, 2024 · 2 comments
Open
1 of 2 tasks

KubernetesPodOperator callback example from Doc doesn't work #39291

owler opened this issue Apr 27, 2024 · 2 comments
Labels
area:providers kind:bug This is a clearly a bug kind:documentation provider:cncf-kubernetes Kubernetes provider related issues

Comments

@owler
Copy link

owler commented Apr 27, 2024

What do you see as an issue?

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html#id13
The problem may related to the None value for api_version and kind see. log below

import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator

class MyCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
        client.create_namespaced_service(
            namespace=pod.metadata.namespace,
            body=k8s.V1Service(
                metadata=k8s.V1ObjectMeta(
                    name=pod.metadata.name,
                    labels=pod.metadata.labels,
                    owner_references=[
                        k8s.V1OwnerReference(
                            api_version=pod.api_version,
                            kind=pod.kind,
                            name=pod.metadata.name,
                            uid=pod.metadata.uid,
                            controller=True,
                            block_owner_deletion=True,
                        )
                    ],
                ),
                spec=k8s.V1ServiceSpec(
                    selector=pod.metadata.labels,
                    ports=[
                        k8s.V1ServicePort(
                            name="http",
                            port=80,
                            target_port=80,
                        )
                    ],
                ),
            ),
        )

with DAG(
        dag_id='test_dag2',
        schedule="45 * * * *",
        start_date=pendulum.datetime(2024, 3, 26, tz="UTC"),
        catchup=False,
        max_active_runs=3,
        dagrun_timeout=None,
        params={
            "srcPath": "/dimas/test_dataset",
            "partition": "",
            "dstPath": "/shared/dmitry.savenko/kube"
        }
) as dag:

        k = KubernetesPodOperator(
            task_id="test_callback",
            image="alpine",
            cmds=["/bin/sh"],
            arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
            name="test-callback",
            callbacks=MyCallback,
        )
run_this_last = EmptyOperator(
    task_id="run_this_last",
)

k >> run_this_last
 File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/models/v1_owner_reference.py", line 97, in api_version
    raise ValueError("Invalid value for `api_version`, must not be `None`")  # noqa: E501


airflow.exceptions.AirflowException: Pod test-callback-lvmxwlac returned a failure.
remote_pod: {'api_version': None,
 'kind': None,
 'metadata': {'annotations': None,
              'creation_timestamp': datetime.datetime(2024, 4, 27, 13, 13, 25, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'labels': {'airflow_kpo_in_cluster': 'True',
                         'airflow_version': '2.8.4',
                         'dag_id': 'test_dag2',
                         'kubernetes_pod_operator': 'True',
                         'run_id': 'scheduled__2024-04-27T1145000000-ac155a96d',
                         'task_id': 'test_callback',

Solving the problem

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@owler owler added kind:bug This is a clearly a bug kind:documentation needs-triage label for new issues that we didn't triage yet labels Apr 27, 2024
Copy link

boring-cyborg bot commented Apr 27, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR RNHTTR added area:providers pending-response provider:cncf-kubernetes Kubernetes provider related issues and removed pending-response needs-triage label for new issues that we didn't triage yet labels Apr 27, 2024
@RNHTTR
Copy link
Collaborator

RNHTTR commented Apr 30, 2024

This is reproducible via the Astro CLI with the code in the original post.

One odd thing about the docs example is the exit 1 in the command -- I don't think exiting with an error has anything to do with the callback demonstration?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug kind:documentation provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

No branches or pull requests

2 participants