Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle db isolation for mapped operators and task groups #39259

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

dstandish
Copy link
Contributor

No description provided.

Comment on lines +3191 to +3192
if isinstance(self.task, MappedOperator):
self.task = context["ti"].task
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is an interesting one @potiuk. The way mapped operators are "expanded" or "unmapped"... it happens inside of MappedOperator.render_template_fields. It does so by replacing the task attr on the ti in the context dictionary, which in the non-db-isolation case mutates what is here self.task! But in db isolation case, the context dict is created via RPC and so the pydantic TI in the context dict is not the same as the PydanticTI that is running.... it's .... quite complicated. But anyway this here is one way to ensure that the task gets properly unmapped -- we don't here rely on mutating the TI in the context dict.

@dstandish
Copy link
Contributor Author

and here @uranusjr ?

Comment on lines +1033 to +1038
# when taking task over RPC, we need to add the dag back
if isinstance(task, MappedOperator):
if not task.dag:
task.dag = dag
elif not task._dag:
task._dag = dag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a fan for this… can we do this earlier in the stack, say when the task is created instead?

Copy link
Contributor Author

@dstandish dstandish Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue @uranusjr is that this is early in the stack when it's a RPC call. the only earlier place we could do it is in the decorator. WDYT? we could stick it in a private function though and get it out of the way and reuse in module though....

when not a RPC call, this has no effect and is not needed.

Comment on lines +3191 to +3192
if isinstance(self.task, MappedOperator):
self.task = context["ti"].task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this not work with BaseOperator? The conditional makes this a lot weirder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s right because only when it is mappedoperator is ti.task mutated. Otherwise ti.task is the result of rpc call and long story short it can’t be used

Copy link
Contributor Author

@dstandish dstandish Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that make sense @uranusjr ?

so with normal task, self.task is the task that is created locally, and there is no need to override it from the one in context dict. and if you did that then you'd take a task object that isn't quite complete, essentially because we don't have proper serialization of Task since there's no real Task entity and no TaskPydantic. But generally it's not a problem because most of the time we don't need to serialize a task object.

in the mappedoperator case though, as we saw last night, "unmapping" is achieved by mutating the ti in the context dict, and it relies on the assumption that the TI in the context dict is the same object as the one that is created locally and being run, which isn't true when the context comes from RPC.

if searching for alternatives, we could look at not relying on the context dict for this "unmapping". e.g. we could forword the "original" ti object to the thing doing the unmapping so we don't need to mutate what's in context.

another option would be, upon receiving a fresh context dict over RPC, we could replace the TIs in the context with the local TIPydantic object -- or something to this effect. then perhaps we could keep the context["ti"] mutation approach for unmapping.

we could also look at changing the way we handle context over RPC. currently it's just a "working" approach but not optimal because there's no laziness. we could optimize by making each context object an accessor that is an RPC call (and we should do something like this ). and something like that could help here too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, but if isinstance(self.task, MappedOperator) is an awkward condition to check for the case.

upon receiving a fresh context dict over RPC, we could replace the TIs in the context with the local TIPydantic object

This sounds somewhat promising. Instead of just the ti, we could probably try to replace the entire relationship (including e.g. dag) so we can get rid of needing to pass in dag separately into _record_task_map_for_downstreams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense, but if isinstance(self.task, MappedOperator) is an awkward condition to check for the case.

yeah, i see what you're saying. e.g. better would be for the code to "tell us" when an unmap has happened.

like when we call

original_task.render_template_fields(context, jinja_env)

that could like... return a new task when it creates one. that would certainly make it more obvious what is going on too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants