Skip to content

Commit

Permalink
feat: soft_fail TriggerDagRunOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
raphaelauv committed Apr 22, 2024
1 parent aa5581e commit 2180556
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion airflow/operators/trigger_dagrun.py
Expand Up @@ -27,7 +27,7 @@

from airflow.api.common.trigger_dag import trigger_dag
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
from airflow.exceptions import AirflowException, AirflowSkipException, DagNotFound, DagRunAlreadyExists
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.dag import DagModel
Expand Down Expand Up @@ -89,6 +89,7 @@ class TriggerDagRunOperator(BaseOperator):
(default: 60)
:param allowed_states: List of allowed states, default is ``['success']``.
:param failed_states: List of failed or dis-allowed states, default is ``None``.
:param soft_fail: Set to true to mark the task as SKIPPED on DagRunAlreadyExists
:param deferrable: If waiting for completion, whether or not to defer the task until done,
default is ``False``.
"""
Expand All @@ -99,6 +100,7 @@ class TriggerDagRunOperator(BaseOperator):
"execution_date",
"conf",
"wait_for_completion",
"soft_fail",
)
template_fields_renderers = {"conf": "py"}
ui_color = "#ffefeb"
Expand All @@ -116,6 +118,7 @@ def __init__(
poke_interval: int = 60,
allowed_states: list[str] | None = None,
failed_states: list[str] | None = None,
soft_fail: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs,
) -> None:
Expand All @@ -134,6 +137,7 @@ def __init__(
self.failed_states = [DagRunState(s) for s in failed_states]
else:
self.failed_states = [DagRunState.FAILED]
self.soft_fail = soft_fail
self._defer = deferrable

if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)):
Expand Down Expand Up @@ -184,6 +188,10 @@ def execute(self, context: Context):
dag_run = e.dag_run
dag.clear(start_date=dag_run.execution_date, end_date=dag_run.execution_date)
else:
if self.soft_fail:
raise AirflowSkipException(
"Skipping due to soft_fail is set to True and DagRunAlreadyExists"
)
raise e
if dag_run is None:
raise RuntimeError("The dag_run should be set here!")
Expand Down

0 comments on commit 2180556

Please sign in to comment.