Skip to content

Commit

Permalink
Clarify pendulum use in timezone cases (#21646)
Browse files Browse the repository at this point in the history
It is important to use Pendulum in case timezone is used - because
there are a number of limitations coming from using stdlib
timezone implementation.

However our documentation was not very clear about it, especially
some examples shown using standard datetime in DAGs which could
mislead our users to continue using datetime if they use timezone.

This PR clarifies and stresses the use of pendulum is necessary
when timezone is used. Also it points to the documentation
in case serialization throws error about not using Pendulum
so that the users can learn about the reasoning.

This is the first part of the change - the follow up will be
changing all provider examples to also use timezone and
pendulum explicitly.

See also #20070
  • Loading branch information
potiuk committed Feb 18, 2022
1 parent 38894e8 commit f011da2
Show file tree
Hide file tree
Showing 43 changed files with 216 additions and 120 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Expand Up @@ -1438,7 +1438,7 @@ We are using certain prefixes for email subjects for different purposes. Start y
Voting is governed by the rules described in `Voting <https://www.apache.org/foundation/voting.html>`_

We are all devoting our time for community as individuals who except for being active in Apache Airflow have
families, daily jobs, right for vacation. Sometimes we are in different time zones or simply are
families, daily jobs, right for vacation. Sometimes we are in different timezones or simply are
busy with day-to-day duties that our response time might be delayed. For us it's crucial
to remember to respect each other in the project with no formal structure.
There are no managers, departments, most of us is autonomous in our opinions, decisions.
Expand Down
6 changes: 3 additions & 3 deletions UPDATING.md
Expand Up @@ -245,7 +245,7 @@ Similarly, `DAG.concurrency` has been renamed to `DAG.max_active_tasks`.
```python
dag = DAG(
dag_id="example_dag",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
concurrency=3,
)
Expand All @@ -256,7 +256,7 @@ dag = DAG(
```python
dag = DAG(
dag_id="example_dag",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
max_active_tasks=3,
)
Expand Down Expand Up @@ -3329,7 +3329,7 @@ Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.models.dag import DAG
>>> from airflow.operators.dummy import DummyOperator
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
>>> dag = DAG('simple_dag', start_date=pendulum.datetime(2017, 9, 1, tz="UTC"))
>>>
>>> task = DummyOperator(task_id='task_1', dag=dag)
>>>
Expand Down
8 changes: 5 additions & 3 deletions airflow/example_dags/example_bash_operator.py
Expand Up @@ -18,7 +18,9 @@

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import datetime, timedelta
import datetime

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator
Expand All @@ -27,9 +29,9 @@
with DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=timedelta(minutes=60),
dagrun_timeout=datetime.timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
Expand Down
14 changes: 7 additions & 7 deletions airflow/example_dags/example_branch_datetime_operator.py
Expand Up @@ -20,15 +20,15 @@
Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as
targets.
"""
import datetime
import pendulum

from airflow import DAG
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.dummy import DummyOperator

dag = DAG(
dag_id="example_branch_datetime_operator",
start_date=datetime.datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
Expand All @@ -42,8 +42,8 @@
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=datetime.datetime(2020, 10, 10, 15, 0, 0),
target_lower=datetime.datetime(2020, 10, 10, 14, 0, 0),
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag,
)

Expand All @@ -54,7 +54,7 @@

dag = DAG(
dag_id="example_branch_datetime_operator_2",
start_date=datetime.datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
Expand All @@ -67,8 +67,8 @@
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=datetime.time(0, 0, 0),
target_lower=datetime.time(15, 0, 0),
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag,
)

Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_branch_day_of_week_operator.py
Expand Up @@ -19,15 +19,15 @@
"""
Example DAG demonstrating the usage of BranchDayOfWeekOperator.
"""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator

with DAG(
dag_id="example_weekday_branch_operator",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
Expand Down
7 changes: 5 additions & 2 deletions airflow/example_dags/example_branch_labels.py
Expand Up @@ -19,14 +19,17 @@
"""
Example DAG demonstrating the usage of labels with different branches.
"""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.edgemodifier import Label

with DAG(
"example_branch_labels", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False
"example_branch_labels",
schedule_interval="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = DummyOperator(task_id="ingest")
analyse = DummyOperator(task_id="analyze")
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_branch_operator.py
Expand Up @@ -19,7 +19,8 @@
"""Example DAG demonstrating the usage of the BranchPythonOperator."""

import random
from datetime import datetime

import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
Expand All @@ -29,7 +30,7 @@

with DAG(
dag_id='example_branch_operator',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
tags=['example', 'example2'],
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_branch_python_dop_operator_3.py
Expand Up @@ -20,7 +20,7 @@
Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
or skipped on alternating runs.
"""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
Expand Down Expand Up @@ -49,7 +49,7 @@ def should_run(**kwargs):
with DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args={'depends_on_past': True},
tags=['example'],
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_complex.py
Expand Up @@ -19,7 +19,7 @@
"""
Example Airflow DAG that shows the complex DAG structure.
"""
from datetime import datetime
import pendulum

from airflow import models
from airflow.models.baseoperator import chain
Expand All @@ -28,7 +28,7 @@
with models.DAG(
dag_id="example_complex",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example', 'example2', 'example3'],
) as dag:
Expand Down
9 changes: 7 additions & 2 deletions airflow/example_dags/example_dag_decorator.py
Expand Up @@ -15,10 +15,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Any, Dict

import httpx
import pendulum

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
Expand All @@ -38,7 +38,12 @@ def execute(self, context: Context):


# [START dag_decorator_usage]
@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
)
def example_dag_decorator(email: str = 'example@example.com'):
"""
DAG to send server IP to email.
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_external_task_marker_dag.py
Expand Up @@ -37,13 +37,13 @@
exception
"""

import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor

start_date = datetime.datetime(2021, 1, 1)
start_date = pendulum.datetime(2021, 1, 1, tz="UTC")

with DAG(
dag_id="example_external_task_marker_parent",
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_kubernetes_executor.py
Expand Up @@ -20,7 +20,8 @@
"""
import logging
import os
from datetime import datetime

import pendulum

from airflow import DAG
from airflow.configuration import conf
Expand All @@ -45,7 +46,7 @@
with DAG(
dag_id='example_kubernetes_executor',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
) as dag:
Expand Down
8 changes: 5 additions & 3 deletions airflow/example_dags/example_latest_only_with_trigger.py
Expand Up @@ -20,7 +20,9 @@
"""

# [START example]
import datetime as dt
import datetime

import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
Expand All @@ -29,8 +31,8 @@

with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2021, 1, 1),
schedule_interval=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_nested_branch_dag.py
Expand Up @@ -21,7 +21,7 @@
``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding
``BranchPythonOperator`` are skipped.
"""
from datetime import datetime
import pendulum

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
Expand All @@ -30,7 +30,7 @@

with DAG(
dag_id="example_nested_branch_dag",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
tags=["example"],
Expand Down
Expand Up @@ -18,10 +18,12 @@

"""Example DAG demonstrating the usage of the params arguments in templated arguments."""

import datetime
import os
from datetime import datetime, timedelta
from textwrap import dedent

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -58,9 +60,9 @@ def print_env_vars(test_mode=None):
with DAG(
"example_passing_params_via_test_command",
schedule_interval='*/1 * * * *',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=timedelta(minutes=4),
dagrun_timeout=datetime.timedelta(minutes=4),
tags=['example'],
) as dag:
run_this = my_py_command(params={"miff": "agg"})
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_python_operator.py
Expand Up @@ -23,9 +23,10 @@
import logging
import shutil
import time
from datetime import datetime
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

Expand All @@ -34,7 +35,7 @@
with DAG(
dag_id='example_python_operator',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_short_circuit_operator.py
Expand Up @@ -17,7 +17,7 @@
# under the License.

"""Example DAG demonstrating the usage of the ShortCircuitOperator."""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.models.baseoperator import chain
Expand All @@ -27,7 +27,7 @@

with DAG(
dag_id='example_short_circuit_operator',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
Expand Down
9 changes: 7 additions & 2 deletions airflow/example_dags/example_skip_dag.py
Expand Up @@ -18,7 +18,7 @@

"""Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default."""

from datetime import datetime
import pendulum

from airflow import DAG
from airflow.exceptions import AirflowSkipException
Expand Down Expand Up @@ -55,6 +55,11 @@ def create_test_pipeline(suffix, trigger_rule):
join >> final


with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag:
with DAG(
dag_id='example_skip_dag',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
create_test_pipeline('1', TriggerRule.ALL_SUCCESS)
create_test_pipeline('2', TriggerRule.ONE_SUCCESS)

0 comments on commit f011da2

Please sign in to comment.