From 88f6bf18f2c3fffef7fa4c2331a2f9545817c794 Mon Sep 17 00:00:00 2001 From: Bowrna Date: Wed, 14 Sep 2022 15:28:12 +0530 Subject: [PATCH] template rendering issue fix --- airflow/decorators/python.py | 4 +-- .../example_dags/example_python_operator.py | 11 +++++- airflow/example_dags/sql/sample.sql | 24 +++++++++++++ docs/apache-airflow/howto/operator/python.rst | 7 ++++ .../endpoints/test_task_instance_endpoint.py | 36 ++++++++++++------- tests/serialization/test_dag_serialization.py | 8 ++--- 6 files changed, 70 insertions(+), 20 deletions(-) create mode 100644 airflow/example_dags/sql/sample.sql diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py index 0568e4809ec3d..8800e7bf9a8de 100644 --- a/airflow/decorators/python.py +++ b/airflow/decorators/python.py @@ -35,8 +35,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator): multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. """ - template_fields: Sequence[str] = ('op_args', 'op_kwargs') - template_fields_renderers = {"op_args": "py", "op_kwargs": "py"} + template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs') + template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} # since we won't mutate the arguments, we should just do the shallow copy # there are some cases we can't deepcopy the objects (e.g protobuf). diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index d8284b19acd0c..3be89bc91c9cd 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -47,6 +47,7 @@ catchup=False, tags=['example'], ) as dag: + # [START howto_operator_python] @task(task_id="print_the_context") def print_context(ds=None, **kwargs): @@ -58,6 +59,14 @@ def print_context(ds=None, **kwargs): run_this = print_context() # [END howto_operator_python] + # [START howto_operator_python_render_sql] + @task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"]) + def log_sql(**kwargs): + logging.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"])) + + log_the_sql = log_sql() + # [END howto_operator_python_render_sql] + # [START howto_operator_python_kwargs] # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively for i in range(5): @@ -69,7 +78,7 @@ def my_sleeping_function(random_base): sleeping_task = my_sleeping_function(random_base=float(i) / 10) - run_this >> sleeping_task + run_this >> log_the_sql >> sleeping_task # [END howto_operator_python_kwargs] if not shutil.which("virtualenv"): diff --git a/airflow/example_dags/sql/sample.sql b/airflow/example_dags/sql/sample.sql new file mode 100644 index 0000000000000..23af6ab4b9bb3 --- /dev/null +++ b/airflow/example_dags/sql/sample.sql @@ -0,0 +1,24 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +CREATE TABLE Orders ( + order_id INT PRIMARY KEY, + name TEXT, + description TEXT +) diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 7128a2a5e00be..8ba2ac64ebaed 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -55,6 +55,13 @@ argument. The ``templates_dict`` argument is templated, so each value in the dictionary is evaluated as a :ref:`Jinja template `. +.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_python_render_sql] + :end-before: [END howto_operator_python_render_sql] + + .. _howto/operator:PythonVirtualenvOperator: diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 44f21aa95a393..092e282c55e65 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -204,7 +204,7 @@ def test_should_respond_200(self, username, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 8, + "priority_weight": 9, "queue": "default_queue", "queued_when": None, "sla_miss": None, @@ -258,7 +258,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 8, + "priority_weight": 9, "queue": "default_queue", "queued_when": None, "sla_miss": None, @@ -302,7 +302,7 @@ def test_should_respond_200_with_task_state_in_removed(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 8, + "priority_weight": 9, "queue": "default_queue", "queued_when": None, "sla_miss": None, @@ -349,7 +349,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 8, + "priority_weight": 9, "queue": "default_queue", "queued_when": None, "sla_miss": { @@ -367,7 +367,7 @@ def test_should_respond_200_task_instance_with_sla_and_rendered(self, session): "try_number": 0, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {'op_args': [], 'op_kwargs': {}}, + "rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None}, "trigger": None, "triggerer_job": None, } @@ -411,7 +411,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "pid": 100, "pool": "default_pool", "pool_slots": 1, - "priority_weight": 8, + "priority_weight": 9, "queue": "default_queue", "queued_when": None, 'sla_miss': None, @@ -421,7 +421,7 @@ def test_should_respond_200_mapped_task_instance_with_rtif(self, session): "try_number": 0, "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", - "rendered_fields": {'op_args': [], 'op_kwargs': {}}, + "rendered_fields": {'op_args': [], 'op_kwargs': {}, 'templates_dict': None}, "trigger": None, "triggerer_job": None, } @@ -832,8 +832,8 @@ def test_should_respond_200_when_task_instance_properties_are_none( ( "with dag filter", {"dag_ids": ["example_python_operator", "example_skip_dag"]}, - 16, - 16, + 17, + 17, ), ], ) @@ -1154,6 +1154,10 @@ def test_should_respond_200_with_reset_dag_run(self, session): "execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=4), "state": State.RUNNING, }, + { + "execution_date": DEFAULT_DATETIME_1 + dt.timedelta(days=5), + "state": State.RUNNING, + }, ] self.create_task_instances( @@ -1182,30 +1186,36 @@ def test_should_respond_200_with_reset_dag_run(self, session): 'dag_id': 'example_python_operator', 'dag_run_id': 'TEST_DAG_RUN_ID_1', 'execution_date': '2020-01-02T00:00:00+00:00', - 'task_id': 'sleep_for_0', + 'task_id': 'log_sql_query', }, { 'dag_id': 'example_python_operator', 'dag_run_id': 'TEST_DAG_RUN_ID_2', 'execution_date': '2020-01-03T00:00:00+00:00', - 'task_id': 'sleep_for_1', + 'task_id': 'sleep_for_0', }, { 'dag_id': 'example_python_operator', 'dag_run_id': 'TEST_DAG_RUN_ID_3', 'execution_date': '2020-01-04T00:00:00+00:00', - 'task_id': 'sleep_for_2', + 'task_id': 'sleep_for_1', }, { 'dag_id': 'example_python_operator', 'dag_run_id': 'TEST_DAG_RUN_ID_4', 'execution_date': '2020-01-05T00:00:00+00:00', + 'task_id': 'sleep_for_2', + }, + { + 'dag_id': 'example_python_operator', + 'dag_run_id': 'TEST_DAG_RUN_ID_5', + 'execution_date': '2020-01-06T00:00:00+00:00', 'task_id': 'sleep_for_3', }, ] for task_instance in expected_response: assert task_instance in response.json["task_instances"] - assert 5 == len(response.json["task_instances"]) + assert 6 == len(response.json["task_instances"]) assert 0 == failed_dag_runs, 0 def test_should_raises_401_unauthenticated(self): diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index aa409183e895c..44a218290ea53 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -2151,8 +2151,8 @@ def x(arg1, arg2, arg3): 'ui_fgcolor': '#000', 'task_id': 'x', 'template_ext': [], - 'template_fields': ['op_args', 'op_kwargs'], - 'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"}, + 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'], + 'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "_disallow_kwargs_override": False, '_expand_input_attr': 'op_kwargs_expand_input', } @@ -2234,8 +2234,8 @@ def x(arg1, arg2, arg3): 'ui_fgcolor': '#000', 'task_id': 'x', 'template_ext': [], - 'template_fields': ['op_args', 'op_kwargs'], - 'template_fields_renderers': {"op_args": "py", "op_kwargs": "py"}, + 'template_fields': ['templates_dict', 'op_args', 'op_kwargs'], + 'template_fields_renderers': {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}, "_disallow_kwargs_override": strict, '_expand_input_attr': 'op_kwargs_expand_input', }