Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify pendulum use in timezone cases #21646

Merged
merged 3 commits into from Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Expand Up @@ -1438,7 +1438,7 @@ We are using certain prefixes for email subjects for different purposes. Start y
Voting is governed by the rules described in `Voting <https://www.apache.org/foundation/voting.html>`_

We are all devoting our time for community as individuals who except for being active in Apache Airflow have
families, daily jobs, right for vacation. Sometimes we are in different time zones or simply are
families, daily jobs, right for vacation. Sometimes we are in different timezones or simply are
busy with day-to-day duties that our response time might be delayed. For us it's crucial
to remember to respect each other in the project with no formal structure.
There are no managers, departments, most of us is autonomous in our opinions, decisions.
Expand Down
6 changes: 3 additions & 3 deletions UPDATING.md
Expand Up @@ -245,7 +245,7 @@ Similarly, `DAG.concurrency` has been renamed to `DAG.max_active_tasks`.
```python
dag = DAG(
dag_id="example_dag",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
concurrency=3,
)
Expand All @@ -256,7 +256,7 @@ dag = DAG(
```python
dag = DAG(
dag_id="example_dag",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
max_active_tasks=3,
)
Expand Down Expand Up @@ -3329,7 +3329,7 @@ Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow.models.dag import DAG
>>> from airflow.operators.dummy import DummyOperator
>>>
>>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1))
>>> dag = DAG('simple_dag', start_date=pendulum.datetime(2017, 9, 1, tz="UTC"))
>>>
>>> task = DummyOperator(task_id='task_1', dag=dag)
>>>
Expand Down
8 changes: 5 additions & 3 deletions airflow/example_dags/example_bash_operator.py
Expand Up @@ -18,7 +18,9 @@

"""Example DAG demonstrating the usage of the BashOperator."""

from datetime import datetime, timedelta
import datetime

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator
Expand All @@ -27,9 +29,9 @@
with DAG(
dag_id='example_bash_operator',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1),
potiuk marked this conversation as resolved.
Show resolved Hide resolved
catchup=False,
dagrun_timeout=timedelta(minutes=60),
dagrun_timeout=datetime.timedelta(minutes=60),
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
Expand Down
14 changes: 7 additions & 7 deletions airflow/example_dags/example_branch_datetime_operator.py
Expand Up @@ -20,15 +20,15 @@
Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as
targets.
"""
import datetime
import pendulum

from airflow import DAG
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.dummy import DummyOperator

dag = DAG(
dag_id="example_branch_datetime_operator",
start_date=datetime.datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1),
potiuk marked this conversation as resolved.
Show resolved Hide resolved
catchup=False,
tags=["example"],
schedule_interval="@daily",
Expand All @@ -42,8 +42,8 @@
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=datetime.datetime(2020, 10, 10, 15, 0, 0),
target_lower=datetime.datetime(2020, 10, 10, 14, 0, 0),
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag,
)

Expand All @@ -54,7 +54,7 @@

dag = DAG(
dag_id="example_branch_datetime_operator_2",
start_date=datetime.datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
Expand All @@ -67,8 +67,8 @@
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
target_upper=datetime.time(0, 0, 0),
target_lower=datetime.time(15, 0, 0),
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag,
)

Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_branch_day_of_week_operator.py
Expand Up @@ -19,15 +19,15 @@
"""
Example DAG demonstrating the usage of BranchDayOfWeekOperator.
"""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator

with DAG(
dag_id="example_weekday_branch_operator",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
schedule_interval="@daily",
Expand Down
7 changes: 5 additions & 2 deletions airflow/example_dags/example_branch_labels.py
Expand Up @@ -19,14 +19,17 @@
"""
Example DAG demonstrating the usage of labels with different branches.
"""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.edgemodifier import Label

with DAG(
"example_branch_labels", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False
"example_branch_labels",
schedule_interval="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = DummyOperator(task_id="ingest")
analyse = DummyOperator(task_id="analyze")
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_branch_operator.py
Expand Up @@ -19,7 +19,8 @@
"""Example DAG demonstrating the usage of the BranchPythonOperator."""

import random
from datetime import datetime

import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
Expand All @@ -29,7 +30,7 @@

with DAG(
dag_id='example_branch_operator',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
tags=['example', 'example2'],
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_branch_python_dop_operator_3.py
Expand Up @@ -20,7 +20,7 @@
Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
or skipped on alternating runs.
"""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
Expand Down Expand Up @@ -49,7 +49,7 @@ def should_run(**kwargs):
with DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
default_args={'depends_on_past': True},
tags=['example'],
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_complex.py
Expand Up @@ -19,7 +19,7 @@
"""
Example Airflow DAG that shows the complex DAG structure.
"""
from datetime import datetime
import pendulum

from airflow import models
from airflow.models.baseoperator import chain
Expand All @@ -28,7 +28,7 @@
with models.DAG(
dag_id="example_complex",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example', 'example2', 'example3'],
) as dag:
Expand Down
9 changes: 7 additions & 2 deletions airflow/example_dags/example_dag_decorator.py
Expand Up @@ -15,10 +15,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Any, Dict

import httpx
import pendulum

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator
Expand All @@ -38,7 +38,12 @@ def execute(self, context: Context):


# [START dag_decorator_usage]
@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'])
@dag(
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
)
def example_dag_decorator(email: str = 'example@example.com'):
"""
DAG to send server IP to email.
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_external_task_marker_dag.py
Expand Up @@ -37,13 +37,13 @@
exception
"""

import datetime
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor

start_date = datetime.datetime(2021, 1, 1)
start_date = pendulum.datetime(2021, 1, 1, tz="UTC")

with DAG(
dag_id="example_external_task_marker_parent",
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_kubernetes_executor.py
Expand Up @@ -20,7 +20,8 @@
"""
import logging
import os
from datetime import datetime

import pendulum

from airflow import DAG
from airflow.configuration import conf
Expand All @@ -45,7 +46,7 @@
with DAG(
dag_id='example_kubernetes_executor',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
) as dag:
Expand Down
8 changes: 5 additions & 3 deletions airflow/example_dags/example_latest_only_with_trigger.py
Expand Up @@ -20,7 +20,9 @@
"""

# [START example]
import datetime as dt
import datetime

import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
Expand All @@ -29,8 +31,8 @@

with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2021, 1, 1),
schedule_interval=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_nested_branch_dag.py
Expand Up @@ -21,7 +21,7 @@
``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding
``BranchPythonOperator`` are skipped.
"""
from datetime import datetime
import pendulum

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
Expand All @@ -30,7 +30,7 @@

with DAG(
dag_id="example_nested_branch_dag",
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
tags=["example"],
Expand Down
Expand Up @@ -18,10 +18,12 @@

"""Example DAG demonstrating the usage of the params arguments in templated arguments."""

import datetime
import os
from datetime import datetime, timedelta
from textwrap import dedent

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -58,9 +60,9 @@ def print_env_vars(test_mode=None):
with DAG(
"example_passing_params_via_test_command",
schedule_interval='*/1 * * * *',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=timedelta(minutes=4),
dagrun_timeout=datetime.timedelta(minutes=4),
tags=['example'],
) as dag:
run_this = my_py_command(params={"miff": "agg"})
Expand Down
5 changes: 3 additions & 2 deletions airflow/example_dags/example_python_operator.py
Expand Up @@ -23,9 +23,10 @@
import logging
import shutil
import time
from datetime import datetime
from pprint import pprint

import pendulum

from airflow import DAG
from airflow.decorators import task

Expand All @@ -34,7 +35,7 @@
with DAG(
dag_id='example_python_operator',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
Expand Down
4 changes: 2 additions & 2 deletions airflow/example_dags/example_short_circuit_operator.py
Expand Up @@ -17,7 +17,7 @@
# under the License.

"""Example DAG demonstrating the usage of the ShortCircuitOperator."""
from datetime import datetime
import pendulum

from airflow import DAG
from airflow.models.baseoperator import chain
Expand All @@ -27,7 +27,7 @@

with DAG(
dag_id='example_short_circuit_operator',
start_date=datetime(2021, 1, 1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
Expand Down
9 changes: 7 additions & 2 deletions airflow/example_dags/example_skip_dag.py
Expand Up @@ -18,7 +18,7 @@

"""Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default."""

from datetime import datetime
import pendulum

from airflow import DAG
from airflow.exceptions import AirflowSkipException
Expand Down Expand Up @@ -55,6 +55,11 @@ def create_test_pipeline(suffix, trigger_rule):
join >> final


with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag:
with DAG(
dag_id='example_skip_dag',
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
create_test_pipeline('1', TriggerRule.ALL_SUCCESS)
create_test_pipeline('2', TriggerRule.ONE_SUCCESS)