Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic mapped tasks group arguments are interpreted as MappedArgument when provided to classic operators #39222

Open
2 tasks done
florian-guily opened this issue Apr 24, 2024 · 6 comments
Assignees
Labels
affected_version:2.8 Issues Reported for 2.8 area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@florian-guily
Copy link

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.4

What happened?

When using expand_kwargs on a task group, arguments of this task group will not get correctly interpreted when using them in classic operators inside this task group. They will be interpreted as MappedArgument instead of their real values.

What you think should happen instead?

Real value of the mapped task group's argument should be passed to the operators.

How to reproduce

This was originally done with a google cloud operator in a task group, but i managed to reproduce it with a bash operator.

from airflow import DAG
from airflow.decorators import task, task_group
from airflow.operators.bash import BashOperator
from pendulum import datetime


dag = DAG(
     dag_id="airflow_issue_test",
     start_date=datetime(2024, 1, 1, tz='UTC'),
     catchup=False,
     schedule="@daily",
     default_args={"retries": 2},
 )

with dag:

    @task()
    def list_dict_generator():
        my_list = [
                {
                    "project": "my_project",
                    "dataset": f"dataset_{number}",
                    "table": "my_table",
                    "partition_id": "my_partition_id",
                }
                for number in range(10)
            ]
        return my_list
    
    @task_group()
    def my_tg(project, dataset, table_name, partition_id): 
        BashOperator(
            task_id="bash_task",
            bash_command=f"echo {project}.{dataset}.{table_name}${partition_id}",
            env={"MY_VAR": "Hello World"}
        )
    
    partitions_to_delete = list_dict_generator()

    my_tg.expand_kwargs(partitions_to_delete)

Here are the associated logs. You can clearly see that multiple MappedArguments are passed in the echo command, which should not happen.
dag_id=airflow_issue_test_run_id=scheduled__2024-04-14T00_00_00+00_00_task_id=bq_to_gcs_tg.bash_task_map_index=0_attempt=2.log

Operating System

Mac OS Sonoma 14.2.1 (23C71)

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.16.0
apache-airflow-providers-common-sql>=1.11.0

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@florian-guily florian-guily added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 24, 2024
Copy link

boring-cyborg bot commented Apr 24, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@RNHTTR RNHTTR added area:dynamic-task-mapping AIP-42 and removed area:core needs-triage label for new issues that we didn't triage yet labels Apr 24, 2024
@RNHTTR
Copy link
Collaborator

RNHTTR commented Apr 24, 2024

@florian-guily Would you like to be assigned to this issue?

@florian-guily
Copy link
Author

@florian-guily Would you like to be assigned to this issue?

I'd like to yes, i'll find the time to resolve it !

@eladkal eladkal added the affected_version:2.8 Issues Reported for 2.8 label Apr 30, 2024
@renzo-sanchez-h
Copy link

renzo-sanchez-h commented May 7, 2024

Hello there, I've just encountered the same issue.
It doesn't only happen with classic operators, but anything in a task group scope, such as:

@task_group
def process_model_requests(model_name):
    config_results = prepare_pod_config(task_id=f"config_for_{model_name}")
    result_paths = prepare_pods(config_results, f"for_{model_name}")
    config_results >> result_paths

I have the version 2.6.3 though.

@florian-guily
Copy link
Author

Hello there, I've just encountered the same issue.
It doesn't only happen with classic operators, but anything in a task group scope, such as:

@task_group
def process_model_requests(model_name):
    config_results = prepare_pod_config(task_id=f"config_for_{model_name}")
    result_paths = prepare_pods(config_results, f"for_{model_name}")
    config_results >> result_paths

I have the version 2.6.3 though.

Interesting, not anything in a task group though, as python operator created with taskflow api seems to work well. Haven't tried with classic python operator.

@renzo-sanchez-h
Copy link

renzo-sanchez-h commented May 8, 2024

Interesting, not anything in a task group though, as python operator created with taskflow api seems to work well. Haven't tried with classic python operator.

Yes, sorry. I meant while using dynamic task mapping. Like the original post.

prefix = "process"
task_params = [{"task_id": "_".join([prefix, str(i + 1)]), "model_name": f"model_{i}"} for i in range(3)]
process_model_requests.expand_kwargs(task_params)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.8 Issues Reported for 2.8 area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants