Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow 2.2.2 pod_override does not override args of V1Container #27358

Closed
2 tasks done
oneturkmen opened this issue Oct 29, 2022 · 9 comments · Fixed by #27450
Closed
2 tasks done

Airflow 2.2.2 pod_override does not override args of V1Container #27358

oneturkmen opened this issue Oct 29, 2022 · 9 comments · Fixed by #27450
Labels
area:core kind:bug This is a clearly a bug

Comments

@oneturkmen
Copy link

oneturkmen commented Oct 29, 2022

Apache Airflow version

2.2.2

What happened

I have a bash sensor defined as follows:

foo_sensor_task = BashSensor(
    task_id="foo_task",
    poke_interval=3600,
    bash_command="python -m foo.run",
    retries=0,
    executor_config={
        "pod_template_file: "path-to-file-yaml",
        "pod_override": k8s.V1Pod(
              spec=k8s.V1PodSpec(
                  containers=[
                      k8s.V1Container(name="base, image="foo-image", args=["abc"])
                  ]
              )
        )
    }
)

Entrypoint command in the foo-image is python -m foo.run. However, when I deploy the image onto Openshift (Kubernetes), the command somehow turns out to be the following:

python -m foo.run airflow tasks run foo_dag foo_sensor_task manual__2022-10-28T21:08:39+00:00 ...

which is wrong.

What you think should happen instead

I assume the expected command should override args (see V1Container args value above) and therefore should be:

python -m foo.run abc

and not:

python -m foo.run airflow tasks run foo_dag foo_sensor_task manual__2022-10-28T21:08:39+00:00 ...

How to reproduce

To reproduce the above issue, create a simple DAG and a sensor as defined above. Use a sample image and try to override the args. I cannot provide the same code due to NDA.

Operating System

RHLS 7.9

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==2.4.0
apache-airflow-providers-cncf-kubernetes==2.1.0
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-mysql==2.1.1
apache-airflow-providers-sqlite==2.0.1

Deployment

Other

Deployment details

N/A

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@oneturkmen oneturkmen added area:core kind:bug This is a clearly a bug labels Oct 29, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 29, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@oneturkmen
Copy link
Author

oneturkmen commented Oct 29, 2022

To add on top of the above-mentioned details, I think the issue is perhaps somewhere in these LOCs: https://github.com/apache/airflow/blob/main/airflow/executors/kubernetes_executor.py#L307-L333

But is precisely here:

The above LOC always overrides args as a command, which is basically airflow tasks run. Why do we pass command as an args? Is that expected behavior?

EDIT: Digging further, I see that PodGenerator.construct_pod() actually intends to override, but probably the ordering of the reduce operations is not correct?

pod_list = [base_worker_pod, pod_override_object, dynamic_pod]
try:
return reduce(PodGenerator.reconcile_pods, pod_list)

@jedcunningham
Copy link
Member

Hey @oneturkmen!

That is expected behavior, as the worker uses that to know what task to run. May I ask what you are trying to ultimately achieve by overwriting args?

This should have been documented though, so I've opened #27450 to do that. Thanks.

@oneturkmen
Copy link
Author

@jedcunningham we wanted to have a BashSensor task that would ping an external service to see if some file is generated or not. If the file isn't there yet, we would keep pinging for some time, and only then if it's still not ready, then we would fail the task.

That is expected behavior, as the worker uses that to know what task to run.

I did not expect that because we use KubernetesPodOperator where we supply our custom image which uses python as the base (i.e., FROM python3.7 and not FROM airflow:2.2.2), and it seems to work as needed. You can see in the code snippets of the KubernetesPodOperator docs here that we are able to override the image along with the command, which does not have the airflow tasks run command appended. Maybe I missing something here.

@jedcunningham
Copy link
Member

I think you are misunderstanding what KubernetesExecutor is actually doing. KE spins up a Airflow worker pod for every task. In your case, it'll spin up a pod and say "Airflow, run task 'foo_task' for dag 'foo_dag' run_id 'manual__...'" (which matches the args KE sets). That worker then will run your (in this case) bash_command (or do whatever else you've asked it to do).

KPO is a different situation. The conceptual "kubectl create pod" is replacing the bash_command, but it still runs from an Airflow worker.

Short version: You want to put all your task specific logic in bash_command when doing a BashSensor. Bonus, this keeps it portable between executors!

I actually gave a talk that covered this at Airflow Summit this year, it's short so might be worth a watch: https://youtu.be/H8JjhiVGOlg

@oneturkmen
Copy link
Author

oneturkmen commented Nov 2, 2022 via email

@devdattakulkarni
Copy link

@jedcunningham Hello. I stumbled upon this issue while debugging an error. I watched the video you mentioned above and could not find an answer so I thought of asking here. I hope it's okay.

So I have a K8s executor with a custom image. I trigger the dag from Airflow UI by passing in custom parameters using the "trigger DAG w/config" option. I understand these parameters will be accessible to the task via the dag_run dictionary. But I am not able to access the dag_run dictionary. Below is a brief snippet of my task definition.
`
my_executor_config = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image="custom-image",
command=["python3","count_exposure_points.py", {{ dag_run.conf['bucket'] }}, {{ dag_run
.conf['prefix'] }}]

@task(executor_config=my_executor_config)
def my_task():
    print_stuff()

my_task
`
I tried dereferencing dag_run as "{{ dag_run }}" (with double quotes) -- but this just passes the string "{{ dag_run }}" to the command. Without the double quotes, DAG fails to load with "Nameerror dag_run is an unknown name".

Any pointers/suggestions on how to access dag_run in a KubernetesExecutor will be very helpful.

Thank you!

@jedcunningham
Copy link
Member

Hi @devdattakulkarni,

Generally you should ask this type of stuff on our slack or in a discussion instead of old issues, even if they are sorta related like this one.

Couple things:

  • Don't use executor config to run a separate python script like you are trying to. KE still needs to run the Airflow worker. Do the import and run whatever you are trying to do in your my_task function, or use BashOperator.
  • If you stick with taskflow, you can access conf with the context, as described in the taskflow tutorial. The templating you were attempting doesn't work everywhere, only in attributes listed in templated_fields in your operator. See the jinjia templating section of the operator docs for details.

@devdattakulkarni
Copy link

@jedcunningham Ack on using Slack or discussions for asking questions. Next time will do that.

Thanks a lot for the detailed answers. They are helpful and confirm the solution that I stumbled upon just an hour ago via trial and error.

So now I am creating my custom image by inheriting from the Airflow Worker image. Then in the executor_config, I am not defining any command. Instead, in my_task, I am using context to get the dag_run parameters and then using Python subprocess to invoke the actual command. This setup is working now.

Hopefully, this explanation can help someone else who runs into this issue.

Thank you for the quick reply 💯

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants