Skip to content

Commit

Permalink
Determine fail_stop on client side when db isolated (#39258)
Browse files Browse the repository at this point in the history
* Determine fail_stop on client side when db isolated

This is needed because we do not ser the dag on Operator objects.

* fix tests
  • Loading branch information
dstandish committed May 4, 2024
1 parent 8d29a96 commit 3d14213
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
19 changes: 17 additions & 2 deletions airflow/models/taskinstance.py
Expand Up @@ -880,6 +880,7 @@ def _handle_failure(
test_mode: bool | None = None,
context: Context | None = None,
force_fail: bool = False,
fail_stop: bool = False,
) -> None:
"""
Handle Failure for a task instance.
Expand All @@ -903,6 +904,7 @@ def _handle_failure(
context=context,
force_fail=force_fail,
session=session,
fail_stop=fail_stop,
)

_log_state(task_instance=task_instance, lead_msg="Immediate failure requested. " if force_fail else "")
Expand Down Expand Up @@ -2966,8 +2968,13 @@ def fetch_handle_failure_context(
context: Context | None = None,
force_fail: bool = False,
session: Session = NEW_SESSION,
fail_stop: bool = False,
):
"""Handle Failure for the TaskInstance."""
"""
Handle Failure for the TaskInstance.
:param fail_stop: if true, stop remaining tasks in dag
"""
get_listener_manager().hook.on_task_instance_failed(
previous_state=TaskInstanceState.RUNNING, task_instance=ti, error=error, session=session
)
Expand Down Expand Up @@ -3030,7 +3037,7 @@ def fetch_handle_failure_context(
email_for_state = operator.attrgetter("email_on_failure")
callbacks = task.on_failure_callback if task else None

if task and task.dag and task.dag.fail_stop:
if task and fail_stop:
_stop_remaining_tasks(task_instance=ti, session=session)
else:
if ti.state == TaskInstanceState.QUEUED:
Expand Down Expand Up @@ -3079,13 +3086,21 @@ def handle_failure(
:param context: Jinja2 context
:param force_fail: if True, task does not retry
"""
if TYPE_CHECKING:
assert self.task
assert self.task.dag
try:
fail_stop = self.task.dag.fail_stop
except Exception:
fail_stop = False
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=fail_stop,
)

def is_eligible_to_retry(self):
Expand Down
8 changes: 8 additions & 0 deletions airflow/serialization/pydantic/taskinstance.py
Expand Up @@ -276,13 +276,21 @@ def handle_failure(
"""
from airflow.models.taskinstance import _handle_failure

if TYPE_CHECKING:
assert self.task
assert self.task.dag
try:
fail_stop = self.task.dag.fail_stop
except Exception:
fail_stop = False
_handle_failure(
task_instance=self,
error=error,
session=session,
test_mode=test_mode,
context=context,
force_fail=force_fail,
fail_stop=fail_stop,
)

def refresh_from_task(self, task: Operator, pool_override: str | None = None) -> None:
Expand Down

0 comments on commit 3d14213

Please sign in to comment.