Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add deferrable support to DatabricksNotebookOperator #39295

Merged
merged 28 commits into from May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
95d3a77
add deferrable support to DatabricksNotebookOperator
rawwar Apr 28, 2024
0d5cd7c
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar May 3, 2024
e7db2d3
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar May 4, 2024
054cf64
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar May 5, 2024
fb78207
Merge branch 'main' into kalyan/db-notebook-deferrable-support
rawwar May 5, 2024
59dc579
refactor defer call
rawwar May 6, 2024
521d921
Merge branch 'main' into kalyan/db-notebook-deferrable-support
rawwar May 6, 2024
e646a06
update caller
rawwar May 6, 2024
798bc22
update caller
rawwar May 6, 2024
e27966f
Update airflow/providers/databricks/operators/databricks.py
rawwar May 6, 2024
5c88c49
fix issue with repair_run check
rawwar May 6, 2024
cfdbcaf
Merge branch 'main' into kalyan/db-notebook-deferrable-support
rawwar May 6, 2024
a7ee133
update logs for failed state
rawwar May 6, 2024
1618ab7
Merge branch 'main' into kalyan/db-notebook-deferrable-support
rawwar May 6, 2024
dfecbf0
rewrite execute_complete
rawwar May 7, 2024
fcd550b
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar May 11, 2024
59a0f47
add test
rawwar May 11, 2024
1cb5b69
add test for termination before defer
rawwar May 11, 2024
52793ee
call execute in tests
rawwar May 11, 2024
dbf25f5
add run id in tests
rawwar May 11, 2024
5198c44
update execption message when job not successful
rawwar May 11, 2024
3a5494b
update error message in tests
rawwar May 11, 2024
46204df
fix tests
rawwar May 12, 2024
fd4041e
refactor tests
rawwar May 12, 2024
7c2dc0f
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar May 12, 2024
0698252
update execute_complete
rawwar May 12, 2024
5e8414e
Merge branch 'main' of https://github.com/apache/airflow into kalyan/…
rawwar May 13, 2024
e853173
assert Trigger type in deferrable test
rawwar May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 21 additions & 1 deletion airflow/providers/databricks/operators/databricks.py
Expand Up @@ -926,6 +926,7 @@ class DatabricksNotebookOperator(BaseOperator):
"""

template_fields = ("notebook_params",)
CALLER = "DatabricksNotebookOperator"
rawwar marked this conversation as resolved.
Show resolved Hide resolved

def __init__(
self,
Expand All @@ -942,6 +943,7 @@ def __init__(
databricks_retry_args: dict[Any, Any] | None = None,
wait_for_termination: bool = True,
databricks_conn_id: str = "databricks_default",
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
**kwargs: Any,
):
self.notebook_path = notebook_path
Expand All @@ -958,11 +960,12 @@ def __init__(
self.wait_for_termination = wait_for_termination
self.databricks_conn_id = databricks_conn_id
self.databricks_run_id: int | None = None
self.deferrable = deferrable
super().__init__(**kwargs)

@cached_property
def _hook(self) -> DatabricksHook:
return self._get_hook(caller="DatabricksNotebookOperator")
return self._get_hook(caller=self.CALLER)

def _get_hook(self, caller: str) -> DatabricksHook:
return DatabricksHook(
Expand Down Expand Up @@ -1041,7 +1044,21 @@ def monitor_databricks_job(self) -> None:
run = self._hook.get_run(self.databricks_run_id)
run_state = RunState(**run["state"])
self.log.info("Current state of the job: %s", run_state.life_cycle_state)

while not run_state.is_terminal:
if self.deferrable:
rawwar marked this conversation as resolved.
Show resolved Hide resolved
return self.defer(
trigger=DatabricksExecutionTrigger(
run_id=self.databricks_run_id,
databricks_conn_id=self.databricks_conn_id,
polling_period_seconds=self.polling_period_seconds,
retry_limit=self.databricks_retry_limit,
retry_delay=self.databricks_retry_delay,
retry_args=self.databricks_retry_args,
caller=self.CALLER,
),
method_name=DEFER_METHOD_NAME,
)
time.sleep(self.polling_period_seconds)
run = self._hook.get_run(self.databricks_run_id)
run_state = RunState(**run["state"])
Expand All @@ -1066,3 +1083,6 @@ def execute(self, context: Context) -> None:
self.launch_notebook_job()
if self.wait_for_termination:
self.monitor_databricks_job()

def execute_complete(self, context: dict | None, event: dict):
rawwar marked this conversation as resolved.
Show resolved Hide resolved
_handle_deferrable_databricks_operator_completion(event, self.log)
rawwar marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions airflow/providers/databricks/triggers/databricks.py
Expand Up @@ -48,6 +48,7 @@ def __init__(
retry_args: dict[Any, Any] | None = None,
run_page_url: str | None = None,
repair_run: bool = False,
caller: str = "DatabricksExecutionTrigger",
) -> None:
super().__init__()
self.run_id = run_id
Expand All @@ -63,6 +64,7 @@ def __init__(
retry_limit=self.retry_limit,
retry_delay=self.retry_delay,
retry_args=retry_args,
caller=caller,
)

def serialize(self) -> tuple[str, dict[str, Any]]:
Expand Down