Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler enters crash loop in certain cases with dynamic task mapping #25681

Closed
1 of 2 tasks
rcheatham-q opened this issue Aug 11, 2022 · 11 comments · Fixed by #25788
Closed
1 of 2 tasks

Scheduler enters crash loop in certain cases with dynamic task mapping #25681

rcheatham-q opened this issue Aug 11, 2022 · 11 comments · Fixed by #25788
Labels
area:core kind:bug This is a clearly a bug

Comments

@rcheatham-q
Copy link

rcheatham-q commented Aug 11, 2022

Apache Airflow version

2.3.3

What happened

The scheduler crashed when attempting to queue a dynamically mapped task which is directly downstream and only dependent on another dynamically mapped task.

scheduler.log
scheduler | ____________       _____________
scheduler | ____    |__( )_________  __/__  /________      __
scheduler | ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
scheduler | ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
scheduler | _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
scheduler | [2022-08-11 08:41:10,922] {scheduler_job.py:708} INFO - Starting the scheduler
scheduler | [2022-08-11 08:41:10,923] {scheduler_job.py:713} INFO - Processing each file at most -1 times
scheduler | [2022-08-11 08:41:10,926] {executor_loader.py:105} INFO - Loaded executor: SequentialExecutor
scheduler | [2022-08-11 08:41:10,929] {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 52386
scheduler | [2022-08-11 08:41:10,932] {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
scheduler | [2022-08-11 08:41:11 -0600] [52385] [INFO] Starting gunicorn 20.1.0
scheduler | [2022-08-11 08:41:11 -0600] [52385] [INFO] Listening at: http://0.0.0.0:8793 (52385)
scheduler | [2022-08-11 08:41:11 -0600] [52385] [INFO] Using worker: sync
scheduler | [2022-08-11 08:41:11 -0600] [52387] [INFO] Booting worker with pid: 52387
scheduler | [2022-08-11 08:41:11,656] {settings.py:55} INFO - Configured default timezone Timezone('UTC')
scheduler | [2022-08-11 08:41:11,659] {manager.py:406} WARNING - Because we cannot use more than 1 thread (parsing_processes = 2) when using sqlite. So we set parallelism to 1.
scheduler | [2022-08-11 08:41:11 -0600] [52388] [INFO] Booting worker with pid: 52388
scheduler | [2022-08-11 08:41:28,118] {dag.py:2968} INFO - Setting next_dagrun for bug_test to 2022-08-11T14:00:00+00:00, run_after=2022-08-11T15:00:00+00:00
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:353} INFO - 20 tasks up for execution:
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=0 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=1 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=2 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=3 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=4 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=5 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=6 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=7 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=8 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=9 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=10 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=11 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=12 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=13 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=14 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=15 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=16 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=17 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=18 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=19 [scheduled]>
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 0/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 1/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 2/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 3/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 4/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 5/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 6/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 7/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 8/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 9/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 10/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 11/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,161] {scheduler_job.py:418} INFO - DAG bug_test has 12/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 13/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 14/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 15/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 16/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:425} INFO - Not executing <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=16 [scheduled]> since the number of tasks running or queued from DAG bug_test is >= to the DAG's max_active_tasks limit of 16
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 16/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:425} INFO - Not executing <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=17 [scheduled]> since the number of tasks running or queued from DAG bug_test is >= to the DAG's max_active_tasks limit of 16
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 16/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:425} INFO - Not executing <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=18 [scheduled]> since the number of tasks running or queued from DAG bug_test is >= to the DAG's max_active_tasks limit of 16
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:418} INFO - DAG bug_test has 16/16 running and queued tasks
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:425} INFO - Not executing <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=19 [scheduled]> since the number of tasks running or queued from DAG bug_test is >= to the DAG's max_active_tasks limit of 16
scheduler | [2022-08-11 08:41:28,162] {scheduler_job.py:504} INFO - Setting the following tasks to queued state:
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=0 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=1 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=2 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=3 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=4 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=5 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=6 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=7 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=8 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=9 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=10 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=11 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=12 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=13 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=14 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=15 [scheduled]>
scheduler | [2022-08-11 08:41:28,164] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=0) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,165] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '0']
scheduler | [2022-08-11 08:41:28,165] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=1) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,165] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '1']
scheduler | [2022-08-11 08:41:28,165] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=2) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,165] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '2']
scheduler | [2022-08-11 08:41:28,165] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=3) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,165] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '3']
scheduler | [2022-08-11 08:41:28,165] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=4) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,166] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '4']
scheduler | [2022-08-11 08:41:28,166] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=5) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,166] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '5']
scheduler | [2022-08-11 08:41:28,166] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=6) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,166] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '6']
scheduler | [2022-08-11 08:41:28,166] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=7) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,166] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '7']
scheduler | [2022-08-11 08:41:28,167] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=8) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,167] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '8']
scheduler | [2022-08-11 08:41:28,167] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=9) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,167] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '9']
scheduler | [2022-08-11 08:41:28,167] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=10) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,167] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '10']
scheduler | [2022-08-11 08:41:28,167] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=11) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,167] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '11']
scheduler | [2022-08-11 08:41:28,167] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=12) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,167] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '12']
scheduler | [2022-08-11 08:41:28,168] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=13) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,168] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '13']
scheduler | [2022-08-11 08:41:28,168] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=14) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,168] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '14']
scheduler | [2022-08-11 08:41:28,168] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=15) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:28,168] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '15']
scheduler | [2022-08-11 08:41:28,170] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '0']
scheduler | [2022-08-11 08:41:29,131] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:29,227] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=0 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:29,584] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '1']
scheduler | [2022-08-11 08:41:30,492] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:30,593] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=1 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:30,969] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '2']
scheduler | [2022-08-11 08:41:31,852] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:31,940] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=2 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:32,308] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '3']
scheduler | [2022-08-11 08:41:33,199] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:33,289] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=3 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:33,656] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '4']
scheduler | [2022-08-11 08:41:34,535] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:34,631] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=4 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:35,013] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '5']
scheduler | [2022-08-11 08:41:35,928] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:36,024] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=5 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:36,393] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '6']
scheduler | [2022-08-11 08:41:37,296] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:37,384] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=6 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:37,758] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '7']
scheduler | [2022-08-11 08:41:38,642] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:38,732] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=7 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:39,113] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '8']
scheduler | [2022-08-11 08:41:39,993] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:40,086] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=8 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:40,461] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '9']
scheduler | [2022-08-11 08:41:41,383] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:41,473] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=9 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:41,865] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '10']
scheduler | [2022-08-11 08:41:42,761] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:42,858] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=10 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:43,236] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '11']
scheduler | [2022-08-11 08:41:44,124] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:44,222] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=11 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:44,654] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '12']
scheduler | [2022-08-11 08:41:45,545] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:45,635] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=12 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:45,998] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '13']
scheduler | [2022-08-11 08:41:46,867] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:46,955] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=13 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:47,386] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '14']
scheduler | [2022-08-11 08:41:48,270] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:48,362] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=14 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:48,718] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '15']
scheduler | [2022-08-11 08:41:49,569] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:49,669] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=15 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:50,022] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,022] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,022] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,022] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,023] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:50,036] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=0, run_start_date=2022-08-11 14:41:29.255370+00:00, run_end_date=2022-08-11 14:41:29.390095+00:00, run_duration=0.134725, state=success, executor_state=success, try_number=1, max_tries=0, job_id=5, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52421
scheduler | [2022-08-11 08:41:50,036] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=1, run_start_date=2022-08-11 14:41:30.628702+00:00, run_end_date=2022-08-11 14:41:30.768539+00:00, run_duration=0.139837, state=success, executor_state=success, try_number=1, max_tries=0, job_id=6, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52423
scheduler | [2022-08-11 08:41:50,036] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=2, run_start_date=2022-08-11 14:41:31.968933+00:00, run_end_date=2022-08-11 14:41:32.112968+00:00, run_duration=0.144035, state=success, executor_state=success, try_number=1, max_tries=0, job_id=7, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52425
scheduler | [2022-08-11 08:41:50,036] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=3, run_start_date=2022-08-11 14:41:33.318972+00:00, run_end_date=2022-08-11 14:41:33.458203+00:00, run_duration=0.139231, state=success, executor_state=success, try_number=1, max_tries=0, job_id=8, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52429
scheduler | [2022-08-11 08:41:50,036] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=4, run_start_date=2022-08-11 14:41:34.663829+00:00, run_end_date=2022-08-11 14:41:34.811273+00:00, run_duration=0.147444, state=success, executor_state=success, try_number=1, max_tries=0, job_id=9, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52437
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=5, run_start_date=2022-08-11 14:41:36.056658+00:00, run_end_date=2022-08-11 14:41:36.203243+00:00, run_duration=0.146585, state=success, executor_state=success, try_number=1, max_tries=0, job_id=10, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52440
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=6, run_start_date=2022-08-11 14:41:37.412705+00:00, run_end_date=2022-08-11 14:41:37.550794+00:00, run_duration=0.138089, state=success, executor_state=success, try_number=1, max_tries=0, job_id=11, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52442
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=7, run_start_date=2022-08-11 14:41:38.761691+00:00, run_end_date=2022-08-11 14:41:38.897424+00:00, run_duration=0.135733, state=success, executor_state=success, try_number=1, max_tries=0, job_id=12, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52446
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=8, run_start_date=2022-08-11 14:41:40.119057+00:00, run_end_date=2022-08-11 14:41:40.262712+00:00, run_duration=0.143655, state=success, executor_state=success, try_number=1, max_tries=0, job_id=13, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52450
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=9, run_start_date=2022-08-11 14:41:41.502857+00:00, run_end_date=2022-08-11 14:41:41.641680+00:00, run_duration=0.138823, state=success, executor_state=success, try_number=1, max_tries=0, job_id=14, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52452
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=10, run_start_date=2022-08-11 14:41:42.889206+00:00, run_end_date=2022-08-11 14:41:43.030804+00:00, run_duration=0.141598, state=success, executor_state=success, try_number=1, max_tries=0, job_id=15, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52454
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=11, run_start_date=2022-08-11 14:41:44.255197+00:00, run_end_date=2022-08-11 14:41:44.413457+00:00, run_duration=0.15826, state=success, executor_state=success, try_number=1, max_tries=0, job_id=16, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52461
scheduler | [2022-08-11 08:41:50,037] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=12, run_start_date=2022-08-11 14:41:45.665373+00:00, run_end_date=2022-08-11 14:41:45.803094+00:00, run_duration=0.137721, state=success, executor_state=success, try_number=1, max_tries=0, job_id=17, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52463
scheduler | [2022-08-11 08:41:50,038] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=13, run_start_date=2022-08-11 14:41:46.988348+00:00, run_end_date=2022-08-11 14:41:47.159584+00:00, run_duration=0.171236, state=success, executor_state=success, try_number=1, max_tries=0, job_id=18, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52465
scheduler | [2022-08-11 08:41:50,038] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=14, run_start_date=2022-08-11 14:41:48.393004+00:00, run_end_date=2022-08-11 14:41:48.533408+00:00, run_duration=0.140404, state=success, executor_state=success, try_number=1, max_tries=0, job_id=19, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52472
scheduler | [2022-08-11 08:41:50,038] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=15, run_start_date=2022-08-11 14:41:49.699253+00:00, run_end_date=2022-08-11 14:41:49.833084+00:00, run_duration=0.133831, state=success, executor_state=success, try_number=1, max_tries=0, job_id=20, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:28.163024+00:00, queued_by_job_id=4, pid=52476
scheduler | [2022-08-11 08:41:51,632] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=0 [success]>'
scheduler | [2022-08-11 08:41:51,633] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=1 [success]>'
scheduler | [2022-08-11 08:41:51,633] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=2 [success]>'
scheduler | [2022-08-11 08:41:51,633] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=3 [success]>'
scheduler | [2022-08-11 08:41:51,633] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=4 [success]>'
scheduler | [2022-08-11 08:41:51,633] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=5 [success]>'
scheduler | [2022-08-11 08:41:51,633] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=6 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=7 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=8 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=9 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=10 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=11 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=12 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=13 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=14 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=15 [success]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=16 [scheduled]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=17 [scheduled]>'
scheduler | [2022-08-11 08:41:51,634] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=18 [scheduled]>'
scheduler | [2022-08-11 08:41:51,635] {dagrun.py:912} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=19 [scheduled]>'
scheduler | [2022-08-11 08:41:51,636] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: bug_test.do_something_else scheduled__2022-08-11T13:00:00+00:00 [None]>'
scheduler | [2022-08-11 08:41:51,688] {scheduler_job.py:353} INFO - 4 tasks up for execution:
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=16 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=17 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=18 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=19 [scheduled]>
scheduler | [2022-08-11 08:41:51,688] {scheduler_job.py:418} INFO - DAG bug_test has 0/16 running and queued tasks
scheduler | [2022-08-11 08:41:51,688] {scheduler_job.py:418} INFO - DAG bug_test has 1/16 running and queued tasks
scheduler | [2022-08-11 08:41:51,688] {scheduler_job.py:418} INFO - DAG bug_test has 2/16 running and queued tasks
scheduler | [2022-08-11 08:41:51,688] {scheduler_job.py:418} INFO - DAG bug_test has 3/16 running and queued tasks
scheduler | [2022-08-11 08:41:51,688] {scheduler_job.py:504} INFO - Setting the following tasks to queued state:
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=16 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=17 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=18 [scheduled]>
scheduler | <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=19 [scheduled]>
scheduler | [2022-08-11 08:41:51,690] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=16) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:51,690] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '16']
scheduler | [2022-08-11 08:41:51,690] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=17) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:51,690] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '17']
scheduler | [2022-08-11 08:41:51,690] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=18) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:51,690] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '18']
scheduler | [2022-08-11 08:41:51,690] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='bug_test', task_id='do_something', run_id='scheduled__2022-08-11T13:00:00+00:00', try_number=1, map_index=19) to executor with priority 2 and queue default
scheduler | [2022-08-11 08:41:51,690] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '19']
scheduler | [2022-08-11 08:41:51,692] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '16']
scheduler | [2022-08-11 08:41:52,532] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:52,620] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=16 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:53,037] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '17']
scheduler | [2022-08-11 08:41:53,907] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:53,996] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=17 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:54,427] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '18']
scheduler | [2022-08-11 08:41:55,305] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:55,397] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=18 [queued]> on host somehost.com
scheduler | [2022-08-11 08:41:55,816] {sequential_executor.py:59} INFO - Executing command: ['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '19']
scheduler | [2022-08-11 08:41:56,726] {dagbag.py:508} INFO - Filling up the DagBag from /path/to/test/dir/bug_test/dags/bug_test.py
scheduler | [2022-08-11 08:41:56,824] {task_command.py:371} INFO - Running <TaskInstance: bug_test.do_something scheduled__2022-08-11T13:00:00+00:00 map_index=19 [queued]> on host somehost.com
scheduler | Traceback (most recent call last):
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
scheduler | self.dialect.do_execute(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
scheduler | cursor.execute(statement, parameters)
scheduler | sqlite3.IntegrityError: UNIQUE constraint failed: task_instance.dag_id, task_instance.task_id, task_instance.run_id, task_instance.map_index
scheduler | 
scheduler | The above exception was the direct cause of the following exception:
scheduler | 
scheduler | Traceback (most recent call last):
scheduler | File "/path/to/test/dir/bug_test/.env/bin/airflow", line 8, in <module>
scheduler | sys.exit(main())
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
scheduler | args.func(args)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
scheduler | return f(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 377, in task_run
scheduler | _run_task_by_selected_method(args, dag, ti)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 183, in _run_task_by_selected_method
scheduler | _run_task_by_local_task_job(args, ti)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 241, in _run_task_by_local_task_job
scheduler | run_job.run()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
scheduler | self._execute()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 133, in _execute
scheduler | self.handle_task_exit(return_code)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 171, in handle_task_exit
scheduler | self._run_mini_scheduler_on_child_tasks()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/session.py", line 71, in wrapper
scheduler | return func(*args, session=session, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/local_task_job.py", line 261, in _run_mini_scheduler_on_child_tasks
scheduler | info = dag_run.task_instance_scheduling_decisions(session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
scheduler | schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
scheduler | expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
scheduler | session.flush()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
scheduler | self._flush(objects)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3485, in _flush
scheduler | transaction.rollback(_capture_exception=True)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
scheduler | compat.raise_(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
scheduler | raise exception
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
scheduler | flush_context.execute()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
scheduler | rec.execute(self)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
scheduler | util.preloaded.orm_persistence.save_obj(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
scheduler | _emit_update_statements(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
scheduler | c = connection._execute_20(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
scheduler | return meth(self, args_10style, kwargs_10style, execution_options)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
scheduler | return connection._execute_clauseelement(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
scheduler | ret = self._execute_context(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
scheduler | self._handle_dbapi_exception(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
scheduler | util.raise_(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
scheduler | raise exception
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
scheduler | self.dialect.do_execute(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
scheduler | cursor.execute(statement, parameters)
scheduler | sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: task_instance.dag_id, task_instance.task_id, task_instance.run_id, task_instance.map_index
scheduler | [SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
scheduler | [parameters: (0, 'do_something_else', 'bug_test', 'scheduled__2022-08-11T13:00:00+00:00', -1)]
scheduler | (Background on this error at: https://sqlalche.me/e/14/gkpj)
scheduler | [2022-08-11 08:41:57,311] {sequential_executor.py:66} ERROR - Failed to execute task Command '['airflow', 'tasks', 'run', 'bug_test', 'do_something', 'scheduled__2022-08-11T13:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/bug_test.py', '--map-index', '19']' returned non-zero exit status 1..
scheduler | [2022-08-11 08:41:57,313] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:57,313] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:57,313] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status success for try_number 1
scheduler | [2022-08-11 08:41:57,313] {scheduler_job.py:599} INFO - Executor reports execution of bug_test.do_something run_id=scheduled__2022-08-11T13:00:00+00:00 exited with status failed for try_number 1
scheduler | [2022-08-11 08:41:57,321] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=16, run_start_date=2022-08-11 14:41:52.649415+00:00, run_end_date=2022-08-11 14:41:52.787286+00:00, run_duration=0.137871, state=success, executor_state=success, try_number=1, max_tries=0, job_id=21, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:51.688924+00:00, queued_by_job_id=4, pid=52479
scheduler | [2022-08-11 08:41:57,321] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=17, run_start_date=2022-08-11 14:41:54.027712+00:00, run_end_date=2022-08-11 14:41:54.170371+00:00, run_duration=0.142659, state=success, executor_state=success, try_number=1, max_tries=0, job_id=22, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:51.688924+00:00, queued_by_job_id=4, pid=52484
scheduler | [2022-08-11 08:41:57,321] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=18, run_start_date=2022-08-11 14:41:55.426712+00:00, run_end_date=2022-08-11 14:41:55.566833+00:00, run_duration=0.140121, state=success, executor_state=success, try_number=1, max_tries=0, job_id=23, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:51.688924+00:00, queued_by_job_id=4, pid=52488
scheduler | [2022-08-11 08:41:57,321] {scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=bug_test, task_id=do_something, run_id=scheduled__2022-08-11T13:00:00+00:00, map_index=19, run_start_date=2022-08-11 14:41:56.859387+00:00, run_end_date=2022-08-11 14:41:57.018604+00:00, run_duration=0.159217, state=success, executor_state=failed, try_number=1, max_tries=0, job_id=24, pool=default_pool, queue=default, priority_weight=2, operator=_PythonDecoratedOperator, queued_dttm=2022-08-11 14:41:51.688924+00:00, queued_by_job_id=4, pid=52490
scheduler | [2022-08-11 08:41:57,403] {scheduler_job.py:768} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
scheduler | Traceback (most recent call last):
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
scheduler | self.dialect.do_execute(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
scheduler | cursor.execute(statement, parameters)
scheduler | sqlite3.IntegrityError: UNIQUE constraint failed: task_instance.dag_id, task_instance.task_id, task_instance.run_id, task_instance.map_index
scheduler | 
scheduler | The above exception was the direct cause of the following exception:
scheduler | 
scheduler | Traceback (most recent call last):
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
scheduler | self._run_scheduler_loop()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
scheduler | num_queued_tis = self._do_scheduling(session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
scheduler | callback_to_run = self._schedule_dag_run(dag_run, session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
scheduler | schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 524, in update_state
scheduler | info = self.task_instance_scheduling_decisions(session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
scheduler | schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
scheduler | expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
scheduler | session.flush()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
scheduler | self._flush(objects)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3485, in _flush
scheduler | transaction.rollback(_capture_exception=True)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
scheduler | compat.raise_(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
scheduler | raise exception
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
scheduler | flush_context.execute()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
scheduler | rec.execute(self)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
scheduler | util.preloaded.orm_persistence.save_obj(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
scheduler | _emit_update_statements(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
scheduler | c = connection._execute_20(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
scheduler | return meth(self, args_10style, kwargs_10style, execution_options)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
scheduler | return connection._execute_clauseelement(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
scheduler | ret = self._execute_context(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
scheduler | self._handle_dbapi_exception(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
scheduler | util.raise_(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
scheduler | raise exception
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
scheduler | self.dialect.do_execute(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
scheduler | cursor.execute(statement, parameters)
scheduler | sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: task_instance.dag_id, task_instance.task_id, task_instance.run_id, task_instance.map_index
scheduler | [SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
scheduler | [parameters: (0, 'do_something_else', 'bug_test', 'scheduled__2022-08-11T13:00:00+00:00', -1)]
scheduler | (Background on this error at: https://sqlalche.me/e/14/gkpj)
scheduler | [2022-08-11 08:41:58,421] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 52386. PIDs of all processes in the group: [52386]
scheduler | [2022-08-11 08:41:58,421] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 52386
scheduler | [2022-08-11 08:41:58,609] {process_utils.py:75} INFO - Process psutil.Process(pid=52386, status='terminated', exitcode=0, started='08:41:10') (52386) terminated with exit code 0
scheduler | [2022-08-11 08:41:58,609] {scheduler_job.py:780} INFO - Exited execute loop
scheduler | Traceback (most recent call last):
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
scheduler | self.dialect.do_execute(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
scheduler | [2022-08-11 08:41:58 -0600] [52385] [INFO] Handling signal: term
scheduler | cursor.execute(statement, parameters)
scheduler | sqlite3.IntegrityError: UNIQUE constraint failed: task_instance.dag_id, task_instance.task_id, task_instance.run_id, task_instance.map_index
scheduler | 
scheduler | The above exception was the direct cause of the following exception:
scheduler | 
scheduler | Traceback (most recent call last):
scheduler | File "/path/to/test/dir/bug_test/.env/bin/airflow", line 8, in <module>
scheduler | sys.exit(main())
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/__main__.py", line 38, in main
scheduler | args.func(args)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 51, in command
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/cli.py", line 99, in wrapper
scheduler | return f(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
scheduler | _run_scheduler_job(args=args)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
scheduler | job.run()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/base_job.py", line 244, in run
scheduler | self._execute()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
scheduler | self._run_scheduler_loop()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
scheduler | num_queued_tis = self._do_scheduling(session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
scheduler | [2022-08-11 08:41:58 -0600] [52387] [INFO] Worker exiting (pid: 52387)
scheduler | [2022-08-11 08:41:58 -0600] [52388] [INFO] Worker exiting (pid: 52388)
scheduler | callback_to_run = self._schedule_dag_run(dag_run, session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
scheduler | schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 524, in update_state
scheduler | info = self.task_instance_scheduling_decisions(session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/utils/session.py", line 68, in wrapper
scheduler | return func(*args, **kwargs)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
scheduler | schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
scheduler | expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
scheduler | session.flush()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
scheduler | self._flush(objects)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3485, in _flush
scheduler | transaction.rollback(_capture_exception=True)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
scheduler | compat.raise_(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
scheduler | raise exception
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
scheduler | flush_context.execute()
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
scheduler | rec.execute(self)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
scheduler | util.preloaded.orm_persistence.save_obj(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
scheduler | _emit_update_statements(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
scheduler | c = connection._execute_20(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
scheduler | return meth(self, args_10style, kwargs_10style, execution_options)
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
scheduler | return connection._execute_clauseelement(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
scheduler | ret = self._execute_context(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
scheduler | self._handle_dbapi_exception(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
scheduler | util.raise_(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
scheduler | raise exception
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
scheduler | self.dialect.do_execute(
scheduler | File "/path/to/test/dir/bug_test/.env/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
scheduler | cursor.execute(statement, parameters)
scheduler | sqlalchemy.exc.IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: task_instance.dag_id, task_instance.task_id, task_instance.run_id, task_instance.map_index
scheduler | [SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
scheduler | [parameters: (0, 'do_something_else', 'bug_test', 'scheduled__2022-08-11T13:00:00+00:00', -1)]
scheduler | (Background on this error at: https://sqlalche.me/e/14/gkpj)
scheduler | [2022-08-11 08:41:58 -0600] [52385] [INFO] Shutting down: Master

What you think should happen instead

The scheduler does not crash and the dynamically mapped task executes normally

How to reproduce

Setup

  • one DAG with two tasks, one directly downstream of the other
  • the DAG has a schedule (e.g. @hourly)
  • both tasks use task expansion
  • the second task uses the output of the first task as its expansion parameter
  • the scheduler's pool size is smaller than the number of map indices in each task

Steps to reproduce

  1. enable the DAG and let it run

Operating System

MacOS and Dockerized Linux on MacOS

Versions of Apache Airflow Providers

None

Deployment

Other

Deployment details

I have tested and confirmed this bug is present in three separate deployments:

  1. airflow standalone
  2. DaskExecutor using docker compose
  3. KubernetesExecutor using Docker Desktop's builtin Kubernetes cluster

All three of these deployments were executed locally on a Macbook Pro.

1. airflow standalone

I created a new Python 3.9 virtual environment, installed Airflow 2.3.3, configured a few environment variables, and executed airflow standalone. Here is a bash script that completes all of these tasks:

airflow_standalone.sh
#!/bin/bash

# ensure working dir is correct
DIR=$(cd $(dirname $BASH_SOURCE[0]) && pwd)
cd $DIR

set -x

# set version parameters
AIRFLOW_VERSION="2.3.3"
PYTHON_VERSION="3.9"

# configure Python environment
if [ ~ -d "$DIR/.env" ]
then
    python3 -m venv "$DIR/.env"
fi

source "$DIR/.env/bin/activate"
pip install --upgrade pip

# install Airflow
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# configure Airflow
export AIRFLOW_HOME="$DIR/.airflow"
export AIRFLOW__CORE__DAGS_FOLDER="$DIR/dags"
export AIRFLOW__CORE__LOAD_EXAMPLES="False"
export AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS="False"

# start Airflow
exec "$DIR/.env/bin/airflow" standalone

Here is the DAG code that can be placed in a dags directory in the same location as the above script. Note that this
DAG code triggers the bug in all environments I tested.

bug_test.py
import pendulum
from airflow.decorators import dag, task

@dag(
    'bug_test',
    schedule_interval='@hourly',
    start_date=pendulum.now().add(hours=-2)
)
def test_scheduler_bug():
    @task
    def do_something(i):
        return i + 10

    @task
    def do_something_else(i):
        import logging
        log = logging.getLogger('airflow.task')
        log.info("I'll never run")

    nums = do_something.expand(i=[i+1 for i in range(20)])
    do_something_else.expand(i=nums)

TEST_DAG = test_scheduler_bug()

Once set up, simply activating the DAG will demonstrate the bug.

2. DaskExecutor on docker compose with Postgres 12

I cannot provide a full replication of this setup as it is rather in depth. The Docker image is starts from python:3.9-slim then installs Airflow with appropriate constraints. It has a lot of additional packages installed, both system and python. It also has a custom entrypoint that can run the Dask scheduler in addition to regular Airflow commands. Here are the applicable Airflow configuration values:

airflow.cfg
[core]
donot_pickle = False
executor = DaskExecutor
load_examples = False
max_active_tasks_per_dag = 16
parallelism = 4

[scheduler]
dag_dir_list_interval = 0
catchup_by_default = False
parsing_processes = 3
scheduler_health_check_threshold = 90

Here is a docker-compose file that is nearly identical to the one I use (I just removed unrelated bits):

docker-compose.yml
version: '3.7'

services:
    metastore:
        image: postgres:12-alpine
        ports:
            - 5432:5432
        container_name: airflow-metastore
        volumes:
            - ${AIRFLOW_HOME_DIR}/pgdata:/var/lib/postgresql/data
        environment:
            POSTGRES_USER: airflow
            POSTGRES_PASSWORD: ${AIRFLOW_DB_PASSWORD}
            PGDATA: /var/lib/postgresql/data/pgdata

    airflow-webserver:
        image: 'my_custom_image:tag'
        ports:
            - '8080:8080'
        depends_on:
            - metastore
        container_name: airflow-webserver
        environment:
            AIRFLOW_HOME: /opt/airflow
            AIRFLOW__WEBSERVER__SECRET_KEY: ${AIRFLOW_SECRET_KEY}
            AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
            AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
        env_file: container_vars.env
        command: 
            - webserver
            - --daemon
            - --access-logfile
            - /var/log/airflow/webserver-access.log
            - --error-logfile
            - /var/log/airflow/webserver-errors.log
            - --log-file
            - /var/log/airflow/webserver.log
        volumes:
            - ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow

    airflow-scheduler:
        image: 'my_custom_image:tag'
        depends_on:
            - metastore
            - dask-scheduler
        container_name: airflow-scheduler
        environment:
            AIRFLOW_HOME: /opt/airflow
            AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
            AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
            SCHEDULER_RESTART_INTERVAL: ${SCHEDULER_RESTART_INTERVAL}
        env_file: container_vars.env
        restart: unless-stopped
        command:
            - scheduler
            - --daemon
            - --log-file
            - /var/log/airflow/scheduler.log
        volumes:
            - ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow

    dask-scheduler:
        image: 'my_custom_image:tag'
        ports:
            - 8787:8787
        container_name: airflow-dask-scheduler
        command:
            - dask-scheduler
        
    dask-worker:
        image: 'my_custom_image:tag'
        depends_on:
            - dask-scheduler
            - metastore
        container_name: airflow-worker
        environment:
            AIRFLOW_HOME: /opt/airflow
            AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
            AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:${AIRFLOW_DB_PASSWORD}@metastore:5432/${AIRFLOW_DB_DATABASE}
        env_file: container_vars.env
        command:
            - dask-worker
            - dask-scheduler:8786
            - --nprocs
            - '8'
            - --nthreads
            - '1'
        volumes:
            - ${AIRFLOW_HOME_DIR}/logs:/var/log/airflow

I also had to manually change the default pool size to 15 in the UI in order to trigger the bug. With the default pool set to 128 the bug did not trigger.

3. KubernetesExecutor on Docker Desktop builtin Kubernetes cluster with Postgres 11

This uses the official Airflow Helm Chart with the following values overrides:

values.yaml
defaultAirflowRepository: my_custom_image
defaultAirflowTag: "my_image_tag"

airflowVersion: "2.3.3"
executor: "KubernetesExecutor"

webserverSecretKeySecretName: airflow-webserver-secret-key
fernetKeySecretName: airflow-fernet-key

config:
  webserver:
    expose_config: 'True'
    base_url: http://localhost:8080
  scheduler:
    catchup_by_default: 'False'
  api:
    auth_backends: airflow.api.auth.backend.default

triggerer:
  enabled: false

statsd:
  enabled: false

redis:
  enabled: false

cleanup:
  enabled: false

logs:
  persistence:
    enabled: true

workers:
  extraVolumes:
  - name: airflow-dags
    hostPath:
      path: /local/path/to/dags
      type: Directory
  extraVolumeMounts:
  - name: airflow-dags
    mountPath: /opt/airflow/dags
    readOnly: true

scheduler:
  extraVolumes:
  - name: airflow-dags
    hostPath:
      path: /local/path/to/dags
      type: Directory
  extraVolumeMounts:
  - name: airflow-dags
    mountPath: /opt/airflow/dags
    readOnly: true

The docker image is the official airflow:2.3.3-python3.9 image with a single environment variable modified:

PYTHONPATH="/opt/airflow/dags/repo/dags:${PYTHONPATH}"

Anything else

This is my understanding of the timeline that produces the crash:

  1. The scheduler queues some of the subtasks in the first task
  2. Some subtasks run and yield their XCom results
  3. The scheduler runs, queueing the remainder of the subtasks for the first task and creates some subtasks in the second task using the XCom results produced thus far
  4. The remainder of the subtasks from the first task complete
  5. The scheduler attempts to recreate all of the subtasks of the second task, including the ones already created, and a unique constraint in the database is violated and the scheduler crashes
  6. When the scheduler restarts, it attempts the previous step again and crashes again, thus entering a crash loop

It seems that if some but not all subtasks for the second task have been created when the scheduler attempts to queue
the mapped task, then the scheduler tries to create all of the subtasks again which causes a unique constraint violation.

NOTES

  • IF the scheduler can queue as many or more tasks as there are map indices for the task, then this won't happen. The
    provided test case succeeded on the DaskExecutor deployment when the default pool was 128, however when I reduced that pool to 15 this bug occurred.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@rcheatham-q rcheatham-q added area:core kind:bug This is a clearly a bug labels Aug 11, 2022
@potiuk potiuk modified the milestone: Airflow 2.3.4 Aug 16, 2022
@potiuk
Copy link
Member

potiuk commented Aug 16, 2022

CC: @ashb @uranusjr @ephraimbuddy - migth be worth at least checking before we attempt to release 2.3.4

@ashb
Copy link
Member

ashb commented Aug 16, 2022

Thanks for the amazingly detailed repro steps -- this should make our job much easier!

@ephraimbuddy
Copy link
Contributor

Task log I saw(just a note for the debugging):

[2022-08-16, 19:22:54 UTC] {taskinstance.py:1561} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-08-16, 19:22:54 UTC] {taskinstance.py:1909} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1471, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1575, in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2232, in render_templates
    rendered_task = self.task.render_template_fields(context)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/mappedoperator.py", line 724, in render_template_fields
    self._resolve_expansion_kwargs(kwargs, template_fields, context, session)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/decorators/base.py", line 458, in _resolve_expansion_kwargs
    v = self._expand_mapped_field(k, v, context, session=session)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/mappedoperator.py", line 757, in _expand_mapped_field
    all_lengths = self._resolve_map_lengths(context["run_id"], session=session)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/mappedoperator.py", line 597, in _resolve_map_lengths
    map_lengths = self._get_map_lengths(run_id, session=session)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/mappedoperator.py", line 570, in _get_map_lengths
    TaskMap.task_id.in_(non_mapped_dep_keys),
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 604, in in_
    return self.operate(in_op, other)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py", line 317, in operate
    return op(self.comparator, *other, **kwargs)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 1386, in in_op
    return a.in_(b)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 604, in in_
    return self.operate(in_op, other)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/orm/properties.py", line 431, in operate
    return op(self.__clause_element__(), *other, **kwargs)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 1386, in in_op
    return a.in_(b)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 604, in in_
    return self.operate(in_op, other)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 858, in operate
    return op(self.comparator, *other, **kwargs)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 1386, in in_op
    return a.in_(b)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/operators.py", line 604, in in_
    return self.operate(in_op, other)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/type_api.py", line 76, in operate
    return o[0](self.expr, op, *(other + o[1:]), **kwargs)
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/sqlalchemy/sql/default_comparator.py", line 165, in _in_impl
    return _boolean_compare(
  File "/home/ephraimbuddy/airflow/env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1562, in signal_handler
    self.task.on_kill()
AttributeError: 'DecoratedMappedOperator' object has no attribute 'on_kill'
[2022-08-16, 19:22:54 UTC] {taskinstance.py:1415} INFO - Marking task as FAILED. dag_id=bug_test, task_id=do_something_else, map_index=17, execution_date=20220816T182209, start_date=20220816T182254, end_date=20220816T182254
[2022-08-16, 19:22:54 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 82 for task do_something_else ('DecoratedMappedOperator' object has no attribute 'on_kill'; 2893)
[2022-08-16, 19:22:54 UTC] {process_utils.py:75} INFO - Process psutil.Process(pid=2893, status='terminated', exitcode=1, started='19:22:53') (2893) terminated with exit code 1

@ashb
Copy link
Member

ashb commented Aug 16, 2022

@ephraimbuddy That error appears to be different to the the one the OP reported fwiw.

@ephraimbuddy
Copy link
Contributor

@ephraimbuddy That error appears to be different to the the one the OP reported fwiw.

Yes. It's different.

@ephraimbuddy
Copy link
Contributor

I was able to reproduce this in main by running the above dag after reducing the default_pool slot to 5

@uranusjr
Copy link
Member

Is this fundamentally caused by the same issue underlying #25060 and #25200? (i.e. race condition caused by two parts of code expanding the same task)

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Aug 17, 2022

Is this fundamentally caused by the same issue underlying #25060 and #25200? (i.e. race condition caused by two parts of code expanding the same task)

It seems like a race condition...expand_mapped_task being called multiple times
This diff will throw more light to what's happening. It stops the crash but shows that the unmapped ti with index=-1 was not really updated to 0 during the first call to expand_mapped_task:

diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 2a93e442b7..b1d9693d32 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -655,6 +655,15 @@ class MappedOperator(AbstractOperator):
             else:
                 # Otherwise convert this into the first mapped index, and create
                 # TaskInstance for other indexes.
+                if session.query(TaskInstance).filter(
+                    TaskInstance.dag_id == self.dag_id,
+                    TaskInstance.run_id == run_id,
+                    TaskInstance.task_id == self.task_id,
+                    TaskInstance.map_index == 0
+                ).one_or_none():
+                    self.log.error("Checked ")
+                    unmapped_ti.state = TaskInstanceState.REMOVED
+                    return all_expanded_tis, total_length
                 unmapped_ti.map_index = 0
                 self.log.debug("Updated in place to become %s", unmapped_ti)
                 all_expanded_tis.append(unmapped_ti)

With the above, some other tasks are being marked as removed too

@ephraimbuddy
Copy link
Contributor

One issue with the provided dag is that it uses a dynamic start_date. With dynamic start date, each time the file processor processes the file, the dag hashes won't match. This leads to the scheduler calling DagRun.verify_integrity.
If you change the dag to have a static start_date and apply the below fix, the issue will be resolved:

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8dcbfcfc60..c55be069e3 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1039,7 +1039,8 @@ class DagRun(Base, LoggingMixin):
         # Check if we have some missing indexes to create ti for
         missing_indexes: Dict["MappedOperator", Sequence[int]] = defaultdict(list)
         for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(expected_indexes[k]).difference(v))})
+            if len(v): # If list is empty, don't record
+                missing_indexes.update({k: list(set(expected_indexes[k]).difference(v))})
         return task_ids, missing_indexes
 
     def _get_task_creator(
@@ -1199,7 +1200,8 @@ class DagRun(Base, LoggingMixin):
             new_indexes[task] = range(new_length)
         missing_indexes: Dict[MappedOperator, Sequence[int]] = defaultdict(list)
         for k, v in existing_indexes.items():
-            missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
+            if len(v):
+                missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})
         return missing_indexes
 
     @staticmethod

I will make a PR to fix updating the missing_index even when the value is null then we can work on the dynamic start_date issue for 2.4.0

@rcheatham-q
Copy link
Author

If you change the dag to have a static start_date and apply the below fix, the issue will be resolved

Will that fix be a part of 2.3.4?

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Aug 18, 2022

If you change the dag to have a static start_date and apply the below fix, the issue will be resolved

Will that fix be a part of 2.3.4?

No. It won't. Sorry.
However, I'm working on fixing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants