Skip to content

Commit

Permalink
Validate retries value on init for better errors (#16415)
Browse files Browse the repository at this point in the history
(cherry picked from commit 15ff238)
  • Loading branch information
uranusjr authored and ashb committed Jun 22, 2021
1 parent 7b9dd0b commit c3bc645
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
8 changes: 8 additions & 0 deletions airflow/models/baseoperator.py
Expand Up @@ -563,6 +563,14 @@ def __init__(
if wait_for_downstream:
self.depends_on_past = True

if retries is not None and not isinstance(retries, int):
try:
parsed_retries = int(retries)
except (TypeError, ValueError):
raise AirflowException(f"'retries' type must be int, not {type(retries).__name__}")
self.log.warning("Implicitly converting 'retries' for %s from %r to int", self, retries)
retries = parsed_retries

self.retries = retries
self.queue = queue
self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool
Expand Down
46 changes: 46 additions & 0 deletions tests/core/test_core.py
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.

import logging
import multiprocessing
import os
import signal
Expand Down Expand Up @@ -453,3 +454,48 @@ def test_dag_params_and_task_params(self):

assert context1['params'] == {'key_1': 'value_1', 'key_2': 'value_2_new', 'key_3': 'value_3'}
assert context2['params'] == {'key_1': 'value_1', 'key_2': 'value_2_old'}


@pytest.fixture()
def dag():
return DAG(TEST_DAG_ID, default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE})


def test_operator_retries_invalid(dag):
with pytest.raises(AirflowException) as ctx:
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=dag,
retries='foo',
)
assert str(ctx.value) == "'retries' type must be int, not str"


def test_operator_retries_coerce(caplog, dag):
with caplog.at_level(logging.WARNING):
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=dag,
retries='1',
)
assert caplog.record_tuples == [
(
"airflow.operators.bash.BashOperator",
logging.WARNING,
"Implicitly converting 'retries' for <Task(BashOperator): test_illegal_args> from '1' to int",
),
]


@pytest.mark.parametrize("retries", [None, 5])
def test_operator_retries(caplog, dag, retries):
with caplog.at_level(logging.WARNING):
BashOperator(
task_id='test_illegal_args',
bash_command='echo success',
dag=dag,
retries=retries,
)
assert caplog.records == []

0 comments on commit c3bc645

Please sign in to comment.