Skip to content

Commit

Permalink
template rendering issue fix (#26390)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bowrna committed Sep 19, 2022
1 parent 24d88e8 commit 4bf0cb9
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 20 deletions.
4 changes: 2 additions & 2 deletions airflow/decorators/python.py
Expand Up @@ -35,8 +35,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
"""

template_fields: Sequence[str] = ('op_args', 'op_kwargs')
template_fields_renderers = {"op_args": "py", "op_kwargs": "py"}
template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')
template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}

# since we won't mutate the arguments, we should just do the shallow copy
# there are some cases we can't deepcopy the objects (e.g protobuf).
Expand Down
11 changes: 10 additions & 1 deletion airflow/example_dags/example_python_operator.py
Expand Up @@ -47,6 +47,7 @@
catchup=False,
tags=['example'],
) as dag:

# [START howto_operator_python]
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
Expand All @@ -58,6 +59,14 @@ def print_context(ds=None, **kwargs):
run_this = print_context()
# [END howto_operator_python]

# [START howto_operator_python_render_sql]
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()
# [END howto_operator_python_render_sql]

# [START howto_operator_python_kwargs]
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
Expand All @@ -69,7 +78,7 @@ def my_sleeping_function(random_base):

sleeping_task = my_sleeping_function(random_base=float(i) / 10)

run_this >> sleeping_task
run_this >> log_the_sql >> sleeping_task
# [END howto_operator_python_kwargs]

if not shutil.which("virtualenv"):
Expand Down
24 changes: 24 additions & 0 deletions airflow/example_dags/sql/sample.sql
@@ -0,0 +1,24 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/

CREATE TABLE Orders (
order_id INT PRIMARY KEY,
name TEXT,
description TEXT
)
7 changes: 7 additions & 0 deletions docs/apache-airflow/howto/operator/python.rst
Expand Up @@ -55,6 +55,13 @@ argument.
The ``templates_dict`` argument is templated, so each value in the dictionary
is evaluated as a :ref:`Jinja template <concepts:jinja-templating>`.

.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_python_render_sql]
:end-before: [END howto_operator_python_render_sql]




.. _howto/operator:PythonVirtualenvOperator:
Expand Down
36 changes: 23 additions & 13 deletions tests/api_connexion/endpoints/test_task_instance_endpoint.py
Expand Up @@ -204,7 +204,7 @@ def test_should_respond_200(self, username, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
Expand Down Expand Up @@ -258,7 +258,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
Expand Down Expand Up @@ -302,7 +302,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": None,
Expand Down Expand Up @@ -349,7 +349,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
"sla_miss": {
Expand All @@ -367,7 +367,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session):
"try_number": 0,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"rendered_fields": {'op_args': [], 'op_kwargs': {}},
"rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
"trigger": None,
"triggerer_job": None,
}
Expand Down Expand Up @@ -411,7 +411,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session):
"pid": 100,
"pool": "default_pool",
"pool_slots": 1,
"priority_weight": 8,
"priority_weight": 9,
"queue": "default_queue",
"queued_when": None,
'sla_miss': None,
Expand All @@ -421,7 +421,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session):
"try_number": 0,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
"rendered_fields": {'op_args': [], 'op_kwargs': {}},
"rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None},
"trigger": None,
"triggerer_job": None,
}
Expand Down Expand Up @@ -832,8 +832,8 @@ def test_should_respond_200_when_task_instance_properties_are_none(
(
"with dag filter",
{"dag_ids": ["example_python_operator", "example_skip_dag"]},
16,
16,
17,
17,
),
],
)
Expand Down Expand Up @@ -1154,6 +1154,10 @@ def test_should_respond_200_with_reset_dag_run(self, session):
"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=4),
"state": State.RUNNING,
},
{
"execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=5),
"state": State.RUNNING,
},
]

self.create_task_instances(
Expand Down Expand Up @@ -1182,30 +1186,36 @@ def test_should_respond_200_with_reset_dag_run(self, session):
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_1',
'execution_date': '2020-01-02T00:00:00+00:00',
'task_id': 'sleep_for_0',
'task_id': 'log_sql_query',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_2',
'execution_date': '2020-01-03T00:00:00+00:00',
'task_id': 'sleep_for_1',
'task_id': 'sleep_for_0',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_3',
'execution_date': '2020-01-04T00:00:00+00:00',
'task_id': 'sleep_for_2',
'task_id': 'sleep_for_1',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_4',
'execution_date': '2020-01-05T00:00:00+00:00',
'task_id': 'sleep_for_2',
},
{
'dag_id': 'example_python_operator',
'dag_run_id': 'TEST_DAG_RUN_ID_5',
'execution_date': '2020-01-06T00:00:00+00:00',
'task_id': 'sleep_for_3',
},
]
for task_instance in expected_response:
assert task_instance in response.json["task_instances"]
assert 5 == len(response.json["task_instances"])
assert 6 == len(response.json["task_instances"])
assert 0 == failed_dag_runs, 0

def test_should_raises_401_unauthenticated(self):
Expand Down
8 changes: 4 additions & 4 deletions tests/serialization/test_dag_serialization.py
Expand Up @@ -2151,8 +2151,8 @@ def x(arg1, arg2, arg3):
'ui_fgcolor': '#000',
'task_id': 'x',
'template_ext': [],
'template_fields': ['op_args', 'op_kwargs'],
'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
"_disallow_kwargs_override": False,
'_expand_input_attr': 'op_kwargs_expand_input',
}
Expand Down Expand Up @@ -2234,8 +2234,8 @@ def x(arg1, arg2, arg3):
'ui_fgcolor': '#000',
'task_id': 'x',
'template_ext': [],
'template_fields': ['op_args', 'op_kwargs'],
'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"},
'template_fields': ['templates_dict', 'op_args', 'op_kwargs'],
'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"},
"_disallow_kwargs_override": strict,
'_expand_input_attr': 'op_kwargs_expand_input',
}
Expand Down

0 comments on commit 4bf0cb9

Please sign in to comment.