New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle db isolation for mapped operators and task groups #39259
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -497,8 +497,14 @@ def _execute_callable(context: Context, **execute_callable_kwargs): | |
for key, value in xcom_value.items(): | ||
task_instance.xcom_push(key=key, value=value, session=session_or_null) | ||
task_instance.xcom_push(key=XCOM_RETURN_KEY, value=xcom_value, session=session_or_null) | ||
if TYPE_CHECKING: | ||
assert task_orig.dag | ||
_record_task_map_for_downstreams( | ||
task_instance=task_instance, task=task_orig, value=xcom_value, session=session_or_null | ||
task_instance=task_instance, | ||
task=task_orig, | ||
dag=task_orig.dag, | ||
value=xcom_value, | ||
session=session_or_null, | ||
) | ||
return result | ||
|
||
|
@@ -1003,25 +1009,40 @@ def _refresh_from_task( | |
task_instance_mutation_hook(task_instance) | ||
|
||
|
||
@internal_api_call | ||
@provide_session | ||
def _record_task_map_for_downstreams( | ||
*, task_instance: TaskInstance | TaskInstancePydantic, task: Operator, value: Any, session: Session | ||
*, | ||
task_instance: TaskInstance | TaskInstancePydantic, | ||
task: Operator, | ||
dag: DAG, | ||
value: Any, | ||
session: Session, | ||
) -> None: | ||
""" | ||
Record the task map for downstream tasks. | ||
|
||
:param task_instance: the task instance | ||
:param task: The task object | ||
:param dag: the dag associated with the task | ||
:param value: The value | ||
:param session: SQLAlchemy ORM Session | ||
|
||
:meta private: | ||
""" | ||
# when taking task over RPC, we need to add the dag back | ||
if isinstance(task, MappedOperator): | ||
if not task.dag: | ||
task.dag = dag | ||
elif not task._dag: | ||
task._dag = dag | ||
|
||
if next(task.iter_mapped_dependants(), None) is None: # No mapped dependants, no need to validate. | ||
return | ||
# TODO: We don't push TaskMap for mapped task instances because it's not | ||
# currently possible for a downstream to depend on one individual mapped | ||
# task instance. This will change when we implement task mapping inside | ||
# a mapped task group, and we'll need to further analyze the case. | ||
# currently possible for a downstream to depend on one individual mapped | ||
# task instance. This will change when we implement task mapping inside | ||
# a mapped task group, and we'll need to further analyze the case. | ||
Comment on lines
+1043
to
+1045
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Accidental? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no -- adding the indent will make it so all of this text is "part of" the todo (i.e. all show yellow in IDE) if we don't do this then it looks like separate comment... just a driveby "fix" but i can remove if you like |
||
if isinstance(task, MappedOperator): | ||
return | ||
if value is None: | ||
|
@@ -3167,6 +3188,8 @@ def render_templates( | |
# MappedOperator is useless for template rendering, and we need to be | ||
# able to access the unmapped task instead. | ||
original_task.render_template_fields(context, jinja_env) | ||
if isinstance(self.task, MappedOperator): | ||
self.task = context["ti"].task | ||
Comment on lines
+3191
to
+3192
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, this is an interesting one @potiuk. The way mapped operators are "expanded" or "unmapped"... it happens inside of
Comment on lines
+3191
to
+3192
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this not work with BaseOperator? The conditional makes this a lot weirder. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That’s right because only when it is mappedoperator is ti.task mutated. Otherwise ti.task is the result of rpc call and long story short it can’t be used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that make sense @uranusjr ? so with normal task, in the mappedoperator case though, as we saw last night, "unmapping" is achieved by mutating the ti in the context dict, and it relies on the assumption that the TI in the context dict is the same object as the one that is created locally and being run, which isn't true when the context comes from RPC. if searching for alternatives, we could look at not relying on the context dict for this "unmapping". e.g. we could forword the "original" ti object to the thing doing the unmapping so we don't need to mutate what's in context. another option would be, upon receiving a fresh context dict over RPC, we could replace the TIs in the context with the local TIPydantic object -- or something to this effect. then perhaps we could keep the context["ti"] mutation approach for unmapping. we could also look at changing the way we handle context over RPC. currently it's just a "working" approach but not optimal because there's no laziness. we could optimize by making each context object an accessor that is an RPC call (and we should do something like this ). and something like that could help here too. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It makes sense, but
This sounds somewhat promising. Instead of just the ti, we could probably try to replace the entire relationship (including e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yeah, i see what you're saying. e.g. better would be for the code to "tell us" when an unmap has happened. like when we call
that could like... return a new task when it creates one. that would certainly make it more obvious what is going on too. |
||
|
||
return original_task | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan for this… can we do this earlier in the stack, say when the task is created instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the issue @uranusjr is that this is early in the stack when it's a RPC call. the only earlier place we could do it is in the decorator. WDYT? we could stick it in a private function though and get it out of the way and reuse in module though....
when not a RPC call, this has no effect and is not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi @uranusjr this is resolved here (Use sentinel to elide the dag object on reserialization) but i can't make this PR yet because it's depending on too many other PRs to get merged first