New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix deadlock when mapped task with removed upstream is rerun #26518
Fix deadlock when mapped task with removed upstream is rerun #26518
Conversation
When a dag with a mapped downstream tasks that depends on a mapped upstream tasks that have some mapped indexes removed is rerun, we run into a deadlock because the trigger rules evaluation is not accounting for removed task instances. The fix for the deadlocks was to account for the removed task instances where possible in the trigger rules In this fix, I added a case where if we set flag_upstream_failed, then for the removed task instance, the downstream of that task instance will be removed. That's if the upstream with index 3 is removed, then downstream with index 3 will also be removed if flag_upstream_failed is set to True.
b7da6a0
to
2584bcb
Compare
Here's how you can reproduce this bug in main: from datetime import datetime
from airflow.decorators import dag, task
@dag(
'bug_test',
schedule='@once',
start_date=datetime(2022,1,1),
max_active_runs=1
)
def test_scheduler_bug():
@task
def do_something(i):
return 6
@task
def do_something_else(i):
import logging
log = logging.getLogger('airflow.task')
log.info("I'll never run")
# After the run, reduce this range to 2
nums = do_something.expand(i=[i+1 for i in range(5)])
do_something_else.expand(i=nums)
TEST_DAG = test_scheduler_bug() |
@@ -162,6 +166,9 @@ def _evaluate_trigger_rule( | |||
changed = ti.set_state(State.UPSTREAM_FAILED, session) | |||
elif skipped: | |||
changed = ti.set_state(State.SKIPPED, session) | |||
elif removed and successes and ti.map_index > -1: | |||
if ti.map_index >= successes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we compare map index with number of upstream successes? that seems odd?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes now you mention it this feels like it's going to break in some other cases.
Like what if there is 1 mapped upstream which is in the failed state, one in the removed state, this would erroneously remove it I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ephraimbuddy Could you take another look at this PR/case please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes now you mention it this feels like it's going to break in some other cases. Like what if there is 1 mapped upstream which is in the failed state, one in the removed state, this would erroneously remove it I think?
In this case, successes
will be 0
, also failed=1
, so the condition will not be reached and the taskinstance will be marked as upstream_failed
. Same thing when we have skipped task instances. The condition to mark the task instance as removed will not be reached.
The condition for the task to be marked removed
is if we have some removed
task instances and successful
task instances, no failed
, no skipped
and the task is mapped. So if we get here, if the map_index
of the task instance is >= all successful task instances, it means the task instance upstream is removed because indexes go from -1 upwards, it's not possible to remove map_index 1 and still have map_index 3?
If we have 5 mapped tasks(0,1,2,3,4), and we remove 2, we will have 3 mapped tasks(0,1,2). If these 3 are successful,(successes=3), then the removed are those greater than or equal to the map index 3(3,4).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if a task has multiple upstreams?
[a, b] >> mapped_task(list_gen)
for instance?
Edit: [a, b] >> mapped_task.map(list_gen)
for instance?
And a is success, b is failure, and list_gen is reduced to only returning a single item?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would apply
airflow/airflow/ti_deps/deps/trigger_rule_dep.py
Lines 165 to 166 in 0c7b4cb
if upstream_failed or failed: | |
changed = ti.set_state(State.UPSTREAM_FAILED, session) |
Line 169 is only satisfied if we have removed
, successes,
no failed
, no skipped
and mapped task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh good. This is important enough functionality (it's the very core of Airflow) that we should add atest cases covering things like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem like it's covered here:
airflow/tests/models/test_taskinstance.py
Lines 1065 to 1216 in 7d6d182
# Parameterized tests to check for the correct firing | |
# of the trigger_rule under various circumstances | |
# Numeric fields are in order: | |
# successes, skipped, failed, upstream_failed, done, removed | |
@pytest.mark.parametrize( | |
"trigger_rule,successes,skipped,failed,upstream_failed,done,removed," | |
"flag_upstream_failed,expect_state,expect_completed", | |
[ | |
# | |
# Tests for all_success | |
# | |
['all_success', 5, 0, 0, 0, 0, 0, True, None, True], | |
['all_success', 2, 0, 0, 0, 0, 0, True, None, False], | |
['all_success', 2, 0, 1, 0, 0, 0, True, State.UPSTREAM_FAILED, False], | |
['all_success', 2, 1, 0, 0, 0, 0, True, State.SKIPPED, False], | |
# | |
# Tests for one_success | |
# | |
['one_success', 5, 0, 0, 0, 5, 0, True, None, True], | |
['one_success', 2, 0, 0, 0, 2, 0, True, None, True], | |
['one_success', 2, 0, 1, 0, 3, 0, True, None, True], | |
['one_success', 2, 1, 0, 0, 3, 0, True, None, True], | |
['one_success', 0, 5, 0, 0, 5, 0, True, State.SKIPPED, False], | |
['one_success', 0, 4, 1, 0, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 3, 1, 1, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 4, 0, 1, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 0, 5, 0, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 0, 4, 1, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 0, 0, 5, 5, 0, True, State.UPSTREAM_FAILED, False], | |
# | |
# Tests for all_failed | |
# | |
['all_failed', 5, 0, 0, 0, 5, 0, True, State.SKIPPED, False], | |
['all_failed', 0, 0, 5, 0, 5, 0, True, None, True], | |
['all_failed', 2, 0, 0, 0, 2, 0, True, State.SKIPPED, False], | |
['all_failed', 2, 0, 1, 0, 3, 0, True, State.SKIPPED, False], | |
['all_failed', 2, 1, 0, 0, 3, 0, True, State.SKIPPED, False], | |
# | |
# Tests for one_failed | |
# | |
['one_failed', 5, 0, 0, 0, 0, 0, True, None, False], | |
['one_failed', 2, 0, 0, 0, 0, 0, True, None, False], | |
['one_failed', 2, 0, 1, 0, 0, 0, True, None, True], | |
['one_failed', 2, 1, 0, 0, 3, 0, True, None, False], | |
['one_failed', 2, 3, 0, 0, 5, 0, True, State.SKIPPED, False], | |
# | |
# Tests for done | |
# | |
['all_done', 5, 0, 0, 0, 5, 0, True, None, True], | |
['all_done', 2, 0, 0, 0, 2, 0, True, None, False], | |
['all_done', 2, 0, 1, 0, 3, 0, True, None, False], | |
['all_done', 2, 1, 0, 0, 3, 0, True, None, False], | |
], | |
) | |
def test_check_task_dependencies( | |
self, | |
trigger_rule: str, | |
successes: int, | |
skipped: int, | |
failed: int, | |
removed: int, | |
upstream_failed: int, | |
done: int, | |
flag_upstream_failed: bool, | |
expect_state: State, | |
expect_completed: bool, | |
dag_maker, | |
): | |
with dag_maker() as dag: | |
downstream = EmptyOperator(task_id="downstream", trigger_rule=trigger_rule) | |
for i in range(5): | |
task = EmptyOperator(task_id=f'runme_{i}', dag=dag) | |
task.set_downstream(downstream) | |
assert task.start_date is not None | |
run_date = task.start_date + datetime.timedelta(days=5) | |
ti = dag_maker.create_dagrun(execution_date=run_date).get_task_instance(downstream.task_id) | |
ti.task = downstream | |
dep_results = TriggerRuleDep()._evaluate_trigger_rule( | |
ti=ti, | |
successes=successes, | |
skipped=skipped, | |
failed=failed, | |
removed=removed, | |
upstream_failed=upstream_failed, | |
done=done, | |
dep_context=DepContext(), | |
flag_upstream_failed=flag_upstream_failed, | |
) | |
completed = all(dep.passed for dep in dep_results) | |
assert completed == expect_completed | |
assert ti.state == expect_state | |
# Parameterized tests to check for the correct firing | |
# of the trigger_rule under various circumstances of mapped task | |
# Numeric fields are in order: | |
# successes, skipped, failed, upstream_failed, done,removed | |
@pytest.mark.parametrize( | |
"trigger_rule,successes,skipped,failed,upstream_failed,done,removed," | |
"flag_upstream_failed,expect_state,expect_completed", | |
[ | |
# | |
# Tests for all_success | |
# | |
['all_success', 5, 0, 0, 0, 0, 0, True, None, True], | |
['all_success', 2, 0, 0, 0, 0, 0, True, None, False], | |
['all_success', 2, 0, 1, 0, 0, 0, True, State.UPSTREAM_FAILED, False], | |
['all_success', 2, 1, 0, 0, 0, 0, True, State.SKIPPED, False], | |
['all_success', 3, 0, 0, 0, 0, 2, True, State.REMOVED, True], # ti.map_index >=successes | |
# | |
# Tests for one_success | |
# | |
['one_success', 5, 0, 0, 0, 5, 0, True, None, True], | |
['one_success', 2, 0, 0, 0, 2, 0, True, None, True], | |
['one_success', 2, 0, 1, 0, 3, 0, True, None, True], | |
['one_success', 2, 1, 0, 0, 3, 0, True, None, True], | |
['one_success', 0, 5, 0, 0, 5, 0, True, State.SKIPPED, False], | |
['one_success', 0, 4, 1, 0, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 3, 1, 1, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 4, 0, 1, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 0, 5, 0, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 0, 4, 1, 5, 0, True, State.UPSTREAM_FAILED, False], | |
['one_success', 0, 0, 0, 5, 5, 0, True, State.UPSTREAM_FAILED, False], | |
# | |
# Tests for all_failed | |
# | |
['all_failed', 5, 0, 0, 0, 5, 0, True, State.SKIPPED, False], | |
['all_failed', 0, 0, 5, 0, 5, 0, True, None, True], | |
['all_failed', 2, 0, 0, 0, 2, 0, True, State.SKIPPED, False], | |
['all_failed', 2, 0, 1, 0, 3, 0, True, State.SKIPPED, False], | |
['all_failed', 2, 1, 0, 0, 3, 0, True, State.SKIPPED, False], | |
['all_failed', 2, 1, 0, 0, 4, 1, True, State.SKIPPED, False], # One removed | |
# | |
# Tests for one_failed | |
# | |
['one_failed', 5, 0, 0, 0, 0, 0, True, None, False], | |
['one_failed', 2, 0, 0, 0, 0, 0, True, None, False], | |
['one_failed', 2, 0, 1, 0, 0, 0, True, None, True], | |
['one_failed', 2, 1, 0, 0, 3, 0, True, None, False], | |
['one_failed', 2, 3, 0, 0, 5, 0, True, State.SKIPPED, False], | |
['one_failed', 2, 2, 0, 0, 5, 1, True, State.SKIPPED, False], # One removed | |
# | |
# Tests for done | |
# | |
['all_done', 5, 0, 0, 0, 5, 0, True, None, True], | |
['all_done', 2, 0, 0, 0, 2, 0, True, None, False], | |
['all_done', 2, 0, 1, 0, 3, 0, True, None, False], | |
['all_done', 2, 1, 0, 0, 3, 0, True, None, False], | |
], | |
) | |
def test_check_task_dependencies_for_mapped( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is removing this part altogether. It's not part of the deadlock issue but I feel that it's good to have stuff.
My reason is this:
If at first run upstream was 3 and downstream was 3 too. Upstream created the downstream. We have 3 -> 3 successes.
Then we reduce upstream to 2, meaning one task is removed and we clear and rerun the dag, without this part of the change, we will end up running all 3 of the downstreams: upstream (2 successful, 1 removed). Downstream(3 successful)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not this one specific line, they are all like this and that's the worry.
The test you highlighted doesn't use mapped tasks so I don't think it covers the case I highlighted. Edit: sorry, original example didn't have a map. Added that.
When a dag with a mapped downstream tasks that depends on a mapped upstream tasks that have some mapped indexes removed is rerun, we run into a deadlock because the trigger rules evaluation is not accounting for removed task instances. The fix for the deadlocks was to account for the removed task instances where possible in the trigger rules In this fix, I added a case where if we set flag_upstream_failed, then for the removed task instance, the downstream of that task instance will be removed. That's if the upstream with index 3 is removed, then downstream with index 3 will also be removed if flag_upstream_failed is set to True. (cherry picked from commit e91637f)
When a dag with a mapped downstream tasks that depends on a mapped upstream tasks that have some mapped indexes removed is rerun, we run into a deadlock because the trigger rules evaluation is not accounting for removed task instances.
The fix for the deadlocks was to account for the removed task instances where possible in the trigger rules
In this fix, I added a case where if we set flag_upstream_failed, then for the removed task instance, the downstream of that task instance will be removed. That's if the upstream with index 3 is removed, then downstream with index 3 will also be removed if flag_upstream_failed is set to True.