Skip to content

Commit

Permalink
adds ability to pass config params to postgres operator (apache#21551)
Browse files Browse the repository at this point in the history
  • Loading branch information
victorphoenix3 committed Mar 18, 2022
1 parent ecc5b74 commit 0ec5677
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 9 deletions.
11 changes: 4 additions & 7 deletions airflow/providers/postgres/example_dags/example_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,11 @@
# [START postgres_operator_howto_guide_get_birth_date]
get_birth_date = PostgresOperator(
task_id="get_birth_date",
sql="""
SELECT * FROM pet
WHERE birth_date
BETWEEN SYMMETRIC DATE '{{ params.begin_date }}' AND DATE '{{ params.end_date }}';
""",
params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
runtime_parameters={'statement_timeout': '3000ms'},
)
# [START postgres_operator_howto_guide_get_birth_date]
# [END postgres_operator_howto_guide_get_birth_date]

create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
# [END postgres_operator_howto_guide]
23 changes: 22 additions & 1 deletion airflow/providers/postgres/operators/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from typing import TYPE_CHECKING, Iterable, List, Mapping, Optional, Sequence, Union

from psycopg2.sql import SQL, Identifier

from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.www import utils as wwwutils
Expand Down Expand Up @@ -56,6 +58,7 @@ def __init__(
autocommit: bool = False,
parameters: Optional[Union[Mapping, Iterable]] = None,
database: Optional[str] = None,
runtime_parameters: Optional[Mapping] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -64,10 +67,28 @@ def __init__(
self.autocommit = autocommit
self.parameters = parameters
self.database = database
self.runtime_parameters = runtime_parameters
self.hook: Optional[PostgresHook] = None

def execute(self, context: 'Context'):
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
if self.runtime_parameters:
final_sql = []
sql_param = {}
for param in self.runtime_parameters:
set_param_sql = f"SET {{}} TO %({param})s;"
dynamic_sql = SQL(set_param_sql).format(Identifier(f"{param}"))
final_sql.append(dynamic_sql)
for param, val in self.runtime_parameters.items():
sql_param.update({f"{param}": f"{val}"})
if self.parameters:
sql_param.update(self.parameters)
if isinstance(self.sql, str):
final_sql.append(SQL(self.sql))
else:
final_sql.extend(list(map(SQL, self.sql)))
self.hook.run(final_sql, self.autocommit, parameters=sql_param)
else:
self.hook.run(self.sql, self.autocommit, parameters=self.parameters)
for output in self.hook.conn.notices:
self.log.info(output)
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ class.
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
Passing Server Configuration Parameters into PostgresOperator
-------------------------------------------------------------

PostgresOperator provides the optional ``runtime_parameters`` attribute which makes it possible to set
the `server configuration parameter values <https://www.postgresql.org/docs/current/runtime-config-client.html>`_ for the SQL request during runtime.

.. exampleinclude:: /../../airflow/providers/postgres/example_dags/example_postgres.py
:language: python
:start-after: [START postgres_operator_howto_guide_get_birth_date]
:end-before: [END postgres_operator_howto_guide_get_birth_date]


The complete Postgres Operator DAG
----------------------------------

Expand All @@ -172,4 +184,5 @@ In this how-to guide we explored the Apache Airflow PostgreOperator. Let's quick
In Airflow-2.0, PostgresOperator class now resides in the ``providers`` package. It is best practice to create subdirectory
called ``sql`` in your ``dags`` directory where you can store your sql files. This will make your code more elegant and more
maintainable. And finally, we looked at the different ways you can dynamically pass parameters into our PostgresOperator
tasks using ``parameters`` or ``params`` attribute.
tasks using ``parameters`` or ``params`` attribute and how you can control the server configuration parameters by passing
the ``runtime_parameters`` attribute.
15 changes: 15 additions & 0 deletions tests/providers/postgres/operators/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,18 @@ def test_overwrite_schema(self):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
except OperationalError as e:
assert 'database "foobar" does not exist' in str(e)

def test_runtime_parameter_setting(self):
"""
Verifies ability to pass server configuration parameters to
PostgresOperator
"""

sql = "SELECT 1;"
op = PostgresOperator(
task_id='postgres_operator_test_runtime_parameter_setting',
sql=sql,
dag=self.dag,
runtime_parameters={'statement_timeout': '3000ms'},
)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

0 comments on commit 0ec5677

Please sign in to comment.