Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

template rendering issue in passing templates_dict param using task decorator #26390

Merged
merged 1 commit into from Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/decorators/python.py
Expand Up @@ -35,8 +35,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
"""

template_fields: Sequence[str] = ('op_args', 'op_kwargs')
template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')
template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}
Comment on lines +38 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for the case you are mentioning


# since we won't mutate the arguments, we should just do the shallow copy
# there are some cases we can't deepcopy the objects (e.g protobuf).
Expand Down
11 changes: 10 additions & 1 deletion airflow/example_dags/example_python_operator.py
Expand Up @@ -47,6 +47,7 @@
catchup=False,
tags=['example'],
) as dag:

# [START howto_operator_python]
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
Expand All @@ -58,6 +59,14 @@ def print_context(ds=None, **kwargs):
run_this = print_context()
# [END howto_operator_python]

# [START howto_operator_python_render_sql]
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()
# [END howto_operator_python_render_sql]

# [START howto_operator_python_kwargs]
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
Expand All @@ -69,7 +78,7 @@ def my_sleeping_function(random_base):

sleeping_task = my_sleeping_function(random_base=float(i) / 10)

run_this >> sleeping_task
run_this >> log_the_sql >> sleeping_task
# [END howto_operator_python_kwargs]

if not shutil.which("virtualenv"):
Expand Down
24 changes: 24 additions & 0 deletions airflow/example_dags/sql/sample.sql
@@ -0,0 +1,24 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/

CREATE TABLE Orders (
order_id INT PRIMARY KEY,
name TEXT,
description TEXT
)
7 changes: 7 additions & 0 deletions docs/apache-airflow/howto/operator/python.rst
Expand Up @@ -55,6 +55,13 @@ argument.
The ``templates_dict`` argument is templated, so each value in the dictionary
is evaluated as a :ref:`Jinja template <concepts:jinja-templating>`.

.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_python_render_sql]
:end-before: [END howto_operator_python_render_sql]




.. _howto/operator:PythonVirtualenvOperator:
Expand Down
36 changes: 23 additions & 13 deletions tests/api_connexion/endpoints/test_task_instance_endpoint.py
Expand Up @@ -204,7 +204,7 @@ def test_should_respond_200(self, username, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
Expand Down Expand Up @@ -258,7 +258,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
Expand Down Expand Up @@ -302,7 +302,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
Expand Down Expand Up @@ -349,7 +349,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": {
Expand All @@ -367,7 +367,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session):
"try_number": 0,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"rendered_fields": {'op_args': [], 'op_kwargs': {}},
"rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
"trigger": None,
"triggerer_job": None,
}
Expand Down Expand Up @@ -411,7 +411,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
'sla_miss': None,
Expand All @@ -421,7 +421,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session):
"try_number": 0,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"rendered_fields": {'op_args': [], 'op_kwargs': {}},
"rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
"trigger": None,
"triggerer_job": None,
}
Expand Down Expand Up @@ -832,8 +832,8 @@ def test_should_respond_200_when_task_instance_properties_are_none(
(
"with dag filter",
{"dag_ids": ["example_python_operator", "example_skip_dag"]},
16,
16,
17,
17,
),
],
)
Expand Down Expand Up @@ -1154,6 +1154,10 @@ def test_should_respond_200_with_reset_dag_run(self, session):
"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=4),
"state": State.RUNNING,
},
{
"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=5),
"state": State.RUNNING,
},
]

self.create_task_instances(
Expand Down Expand Up @@ -1182,30 +1186,36 @@ def test_should_respond_200_with_reset_dag_run(self, session):
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_1',
'execution_date': '2020-01-02T00:00:00+00:00',
'task_id': 'sleep_for_0',
'task_id': 'log_sql_query',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_2',
'execution_date': '2020-01-03T00:00:00+00:00',
'task_id': 'sleep_for_1',
'task_id': 'sleep_for_0',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_3',
'execution_date': '2020-01-04T00:00:00+00:00',
'task_id': 'sleep_for_2',
'task_id': 'sleep_for_1',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_4',
'execution_date': '2020-01-05T00:00:00+00:00',
'task_id': 'sleep_for_2',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_5',
'execution_date': '2020-01-06T00:00:00+00:00',
'task_id': 'sleep_for_3',
},
]
for task_instance in expected_response:
assert task_instance in response.json["task_instances"]
assert 5 == len(response.json["task_instances"])
assert 6 == len(response.json["task_instances"])
assert 0 == failed_dag_runs, 0

def test_should_raises_401_unauthenticated(self):
Expand Down
8 changes: 4 additions & 4 deletions tests/serialization/test_dag_serialization.py
Expand Up @@ -2151,8 +2151,8 @@ def x(arg1, arg2, arg3):
'ui_fgcolor': '#000',
'task_id': 'x',
'template_ext': [],
'template_fields': ['op_args', 'op_kwargs'],
'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
"_disallow_kwargs_override": False,
'_expand_input_attr': 'op_kwargs_expand_input',
}
Expand Down Expand Up @@ -2234,8 +2234,8 @@ def x(arg1, arg2, arg3):
'ui_fgcolor': '#000',
'task_id': 'x',
'template_ext': [],
'template_fields': ['op_args', 'op_kwargs'],
'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
"_disallow_kwargs_override": strict,
'_expand_input_attr': 'op_kwargs_expand_input',
}
Expand Down