Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow scheduler crashes due to 'duplicate key value violates unique constraint "task_instance_pkey"' #25060

Closed
1 of 2 tasks
ldacey opened this issue Jul 14, 2022 · 14 comments · Fixed by #25532
Closed
1 of 2 tasks
Labels
area:core kind:bug This is a clearly a bug
Milestone

Comments

@ldacey
Copy link
Contributor

ldacey commented Jul 14, 2022

Apache Airflow version

2.3.3 (latest released)

What happened

Our schedulers have crashed on two occasions after upgrading to Airflow 2.3.3. The same DAG is responsible each time, but this is likely due to the fact that it is the only dynamic task mapping DAG running right now (catching up some historical data). This DAG uses the same imported @task function that many other DAGs used successfully with no errors. The issue has only occurred after upgrading to Airflow 2.3.3

What you think should happen instead

This error should not be raised - there should be no record of this task instance because, according to the UI, the task has not run yet. The extract task is green but the transform task which raised the error is blank. The DAG run is stuck in the running state until eventually the scheduler dies and the Airflow banner notifies me that there is no scheduler heartbeat.

Also, this same DAG (and other which use the same imported external @task function) ran for hours before the upgrade to Airflow 2.3.3.

[2022-07-13 22:49:55,880] {process_utils.py:75} INFO - Process psutil.Process(pid=143, status='terminated', started='22:49:52') (143) terminated with exit code None
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
[SQL: UPDATE task_instance SET map_index=%(map_index)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
    compat.raise_(
[2022-07-13 22:50:23,909] {scheduler_job.py:780} INFO - Exited execute loop
[parameters: {'map_index': 0, 'task_instance_task_id': 'transform', 'task_instance_dag_id': 'dag-id', 'task_instance_run_id': 'scheduled__2022-06-04T14:05:00+00:00', 'task_instance_map_index': -1}]
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 590, in connect
    rec = pool._do_get()
    rec = pool._do_get()

  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
    return dialect.connect(*cargs, **cparams)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[parameters: {'map_index': 0, 'task_instance_task_id': 'transform', 'task_instance_dag_id': 'dag_id, 'task_instance_run_id': 'scheduled__2022-06-04T14:05:00+00:00', 'task_instance_map_index': -1}]
[SQL: UPDATE task_instance SET map_index=%(map_index)s WHERE task_instance.task_id = %(task_instance_task_id)s AND task_instance.dag_id = %(task_instance_dag_id)s AND task_instance.run_id = %(task_instance_run_id)s AND task_instance.map_index = %(task_instance_map_index)s]
[2022-07-13 22:49:25,323] {scheduler_job.py:780} INFO - Exited execute loop
    raise exception
[2022-07-13 22:49:56,001] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
[2022-07-13 22:49:56,014] {process_utils.py:75} INFO - Process psutil.Process(pid=144, status='terminated', started='22:49:52') (144) terminated with exit code None
    compat.raise_(
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
[2022-07-13 22:49:56,016] {process_utils.py:75} INFO - Process psutil.Process(pid=140, status='terminated', exitcode=0, started='22:49:51') (140) terminated with exit code 0
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
[2022-07-13 22:49:56,018] {scheduler_job.py:780} INFO - Exited execute loop
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    self.dialect.do_execute(
Traceback (most recent call last):
    cursor.execute(statement, parameters)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
    cursor.execute(statement, parameters)
    self.dialect.do_execute(
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(oportun-five9-calls-ccvcc-v1, transform, scheduled__2022-06-04T14:05:00+00:00, 0) already exists.
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute

How to reproduce

Run a dynamic task mapping DAG in Airflow 2.3.3

Operating System

Ubuntu 20.04

Versions of Apache Airflow Providers

The 2.3.3 constraints file for Python 3.10 is used for the specific versions:

apache-airflow-providers-celery
apache-airflow-providers-docker
apache-airflow-providers-ftp
apache-airflow-providers-http
apache-airflow-providers-microsoft-azure
apache-airflow-providers-mysql
apache-airflow-providers-postgres
apache-airflow-providers-odbc
apache-airflow-providers-redis
apache-airflow-providers-salesforce
apache-airflow-providers-sftp
apache-airflow-providers-ssh
apache-airflow-providers-google

Deployment

Other Docker-based deployment

Deployment details

I am using two schedulers which run on separate nodes.

Anything else

The DAG only allows 1 max active DAG run at a time. catchup=True is enabled and it has been running to fill in all tasks since 05/10 start_date.

The extract() task returns a list of 1 or more files which have been saved on cloud storage. The transform task processes each of these paths dynamically. I have used these same tasks (imported from another file) for over 15 different DAGs so far without issue. The problem only occurred yesterday sometime after updating Airflow to 2.3.3.

def dag_name():
    retrieved = extract()
    transform = transform_files(retrieved)
    finalize = finalize_dataset(transform)
    consolidate = consolidate_staging(transform)

    retrieved >> transform >> finalize >> consolidate 

My transform_files task is just a function which expands the XCom Arg of the extract task and transforms each file. Nearly everything is based on DAG params which are customized in the DAG.


transform_file_task = task(process_data)

def transform_files(source):
    return (
        transform_file_task.override(task_id="transform")
        .partial(
            destination=f"{{{{ params.container }}}}/{{{{ dag.dag_id | dag_name }}}}/{{{{ ti.task_id }}}}",
            wasb_conn_id="{{ params.wasb_conn_id }}",
            pandas_options="{{ params.pandas_options}}",
            meta_columns="{{ params.meta_columns }}",
            script="{{ params.script }}",
            function_name="{{ params.function }}",
            schema_name="{{ params.schema }}",
            template=f"{{{{ dag.dag_id | dag_version }}}}-{{{{ ti.run_id }}}}-{{{{ ti.map_index }}}}",
            existing_data_behavior="overwrite_or_ignore",
            partition_columns="{{ params.partition_columns }}",
            dag_name="{{ dag.dag_id | dag_name }}",
            failure_recipients="{{ params.recipients }}",
            success_recipients="{{ params.recipients }}",
        )
        .expand(source=source)
    )

Deleting the DAG run which caused the error and restarting the Airflow scheduler fixes the issue temporarily. If I do not delete the DAG run then the scheduler will keep dying.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ldacey ldacey added area:core kind:bug This is a clearly a bug labels Jul 14, 2022
@potiuk
Copy link
Member

potiuk commented Jul 15, 2022

Comment from @ldacey slack - this seems like 2.3.3-only issue. 2.3.2 works fine

@potiuk
Copy link
Member

potiuk commented Jul 15, 2022

CC: @uranusjr @ashb - you might want to take a look, seems like a regression in 2.3.3

@NorthWindH
Copy link

NorthWindH commented Jul 26, 2022

Just commenting that I'm also seeing this issue on 2.3.3 under similar circumstances

@joshua-yeung-mox
Copy link

We are also facing this issue when we use dynamic task mapping in 2.3.3
The scheduler crashes after a while when we enable the dag that uses dynamic task mapping
As a result, we need to downgrade to 2.3.2
But 2.3.2 has an issue related to Task Group, UI shows incorrect status in grid view (#24998). This affect our daily operation as we cannot see the status directly from grid view but only when we expand those task group
I wonder if this issue will be fixed in 2.3.4 and what is the target release date of 2.3.4?

@ephraimbuddy
Copy link
Contributor

@Idacey, Can you show the full log? It's not clear to me where this started to fail

@sukso96100
Copy link

sukso96100 commented Aug 4, 2022

I'm also facing this issue, I'm also using 2.3.3 with dynamic task mapping. But using SQL Server for metadata database. The error on log is basically same. (Inserting duplicate key on 'dbo.task_instance' is not possible because it violates constraint 'task_instance_pkey') After the error occures, Like other people here, Tasks and DAGs stuck in "running" state until Airflow banner notifies me that there is no scheduler heartbeat.

More info on my Airflow environment

  • Kubernetes (Azure Kubernetes Service)
  • Using CeleryKubernetesExecutor
  • SQL Server as metadata database
sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) 

Error code 2627 - List of SQL Server error codes
Violation of PRIMARY KEY constraint 'task_instance_pkey'. Cannot insert duplicate key in object 'dbo.task_instance'. The duplicate key value is (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)

Click to see full log
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2022-08-04 00:36:48,513] {scheduler_job.py:708} INFO - Starting the scheduler
[2022-08-04 00:36:48,513] {scheduler_job.py:713} INFO - Processing each file at most -1 times
[2022-08-04 00:36:48,600] {default_celery.py:97} WARNING - You have configured a result_backend of redis://:gDkWKFckB2@airflow-redis:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2022-08-04 00:36:48,667] {kubernetes_executor.py:520} INFO - Start Kubernetes executor
[2022-08-04 00:36:48,707] {kubernetes_executor.py:128} INFO - Event: and now my watch begins starting at resource_version: 0
[2022-08-04 00:36:48,726] {kubernetes_executor.py:469} INFO - Found 0 queued task instances
[2022-08-04 00:36:48,735] {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 33
[2022-08-04 00:36:48,737] {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
[2022-08-04 00:36:48,750] {settings.py:55} INFO - Configured default timezone Timezone('UTC')
[2022-08-04 00:36:48,757] {settings.py:540} INFO - Loaded airflow_local_settings from /opt/airflow/config/airflow_local_settings.py .
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
[2022-08-04 00:36:50,327] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 [None]>'
[2022-08-04 00:36:50,328] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=0 [None]>'
[2022-08-04 00:36:50,329] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=1 [None]>'
[2022-08-04 00:36:50,331] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=2 [None]>'
[2022-08-04 00:36:50,332] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=3 [None]>'
[2022-08-04 00:36:50,332] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=4 [None]>'
[2022-08-04 00:36:50,333] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=5 [None]>'
[2022-08-04 00:36:50,333] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=6 [None]>'
[2022-08-04 00:36:50,335] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=7 [None]>'
[2022-08-04 00:36:50,335] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=8 [None]>'
[2022-08-04 00:36:50,336] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=9 [None]>'
[2022-08-04 00:36:50,336] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=10 [None]>'
[2022-08-04 00:36:50,338] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=11 [None]>'
[2022-08-04 00:36:50,338] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=12 [None]>'
[2022-08-04 00:36:50,339] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=13 [None]>'
[2022-08-04 00:36:50,340] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=14 [None]>'
[2022-08-04 00:36:50,340] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=15 [None]>'
[2022-08-04 00:36:50,343] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=16 [None]>'
[2022-08-04 00:36:50,343] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=17 [None]>'
[2022-08-04 00:36:50,344] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=18 [None]>'
[2022-08-04 00:36:50,344] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=19 [None]>'
[2022-08-04 00:36:50,345] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=20 [None]>'
[2022-08-04 00:36:50,345] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=21 [None]>'
[2022-08-04 00:36:50,346] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=22 [None]>'
[2022-08-04 00:36:50,346] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=23 [None]>'
[2022-08-04 00:36:50,711] {scheduler_job.py:768} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 524, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
    schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
    expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
    session.flush()
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3484, in _flush
    with util.safe_reraise():
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    _emit_update_statements(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
    c = connection._execute_20(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
[SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
[parameters: (0, 'azplan_unbilled_lineitems', 'az_partner_etl_usage', 'scheduled__2022-08-02T00:00:00+00:00', -1)]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[2022-08-04 00:36:50,724] {kubernetes_executor.py:821} INFO - Shutting down Kubernetes executor
[2022-08-04 00:36:51,788] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 33. PIDs of all processes in the group: [121, 122, 33]
[2022-08-04 00:36:51,788] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 33
[2022-08-04 00:36:52,868] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
[2022-08-04 00:36:52,875] {process_utils.py:75} INFO - Process psutil.Process(pid=121, status='terminated', started='00:36:51') (121) terminated with exit code None
[2022-08-04 00:36:52,875] {process_utils.py:75} INFO - Process psutil.Process(pid=122, status='terminated', started='00:36:51') (122) terminated with exit code None
[2022-08-04 00:36:52,876] {process_utils.py:75} INFO - Process psutil.Process(pid=33, status='terminated', exitcode=0, started='00:36:48') (33) terminated with exit code 0
[2022-08-04 00:36:52,876] {scheduler_job.py:780} INFO - Exited execute loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 524, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
    schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
    expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
    session.flush()
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3484, in _flush
    with util.safe_reraise():
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    _emit_update_statements(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
    c = connection._execute_20(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
[SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
[parameters: (0, 'azplan_unbilled_lineitems', 'az_partner_etl_usage', 'scheduled__2022-08-02T00:00:00+00:00', -1)]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2022-08-04 00:42:09,592] {scheduler_job.py:708} INFO - Starting the scheduler
[2022-08-04 00:42:09,592] {scheduler_job.py:713} INFO - Processing each file at most -1 times
[2022-08-04 00:42:09,698] {default_celery.py:97} WARNING - You have configured a result_backend of redis://:gDkWKFckB2@airflow-redis:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2022-08-04 00:42:09,775] {kubernetes_executor.py:520} INFO - Start Kubernetes executor
[2022-08-04 00:42:09,816] {kubernetes_executor.py:128} INFO - Event: and now my watch begins starting at resource_version: 0
[2022-08-04 00:42:09,836] {kubernetes_executor.py:469} INFO - Found 0 queued task instances
[2022-08-04 00:42:09,845] {manager.py:160} INFO - Launched DagFileProcessorManager with pid: 33
[2022-08-04 00:42:09,847] {scheduler_job.py:1233} INFO - Resetting orphaned tasks for active dag runs
[2022-08-04 00:42:09,863] {settings.py:55} INFO - Configured default timezone Timezone('UTC')
[2022-08-04 00:42:09,868] {settings.py:540} INFO - Loaded airflow_local_settings from /opt/airflow/config/airflow_local_settings.py .
/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py:52 DeprecationWarning: Passing filename_template to FileTaskHandler is deprecated and has no effect
[2022-08-04 00:42:12,059] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 [None]>'
[2022-08-04 00:42:12,060] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=0 [None]>'
[2022-08-04 00:42:12,060] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=1 [None]>'
[2022-08-04 00:42:12,061] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=2 [None]>'
[2022-08-04 00:42:12,061] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=3 [None]>'
[2022-08-04 00:42:12,061] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=4 [None]>'
[2022-08-04 00:42:12,062] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=5 [None]>'
[2022-08-04 00:42:12,063] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=6 [None]>'
[2022-08-04 00:42:12,063] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=7 [None]>'
[2022-08-04 00:42:12,067] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=8 [None]>'
[2022-08-04 00:42:12,067] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=9 [None]>'
[2022-08-04 00:42:12,068] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=10 [None]>'
[2022-08-04 00:42:12,068] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=11 [None]>'
[2022-08-04 00:42:12,068] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=12 [None]>'
[2022-08-04 00:42:12,069] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=13 [None]>'
[2022-08-04 00:42:12,069] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=14 [None]>'
[2022-08-04 00:42:12,070] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=15 [None]>'
[2022-08-04 00:42:12,070] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=16 [None]>'
[2022-08-04 00:42:12,071] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=17 [None]>'
[2022-08-04 00:42:12,071] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=18 [None]>'
[2022-08-04 00:42:12,072] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=19 [None]>'
[2022-08-04 00:42:12,075] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=20 [None]>'
[2022-08-04 00:42:12,075] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=21 [None]>'
[2022-08-04 00:42:12,076] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=22 [None]>'
[2022-08-04 00:42:12,076] {dagrun.py:937} INFO - Restoring mapped task '<TaskInstance: az_partner_etl_usage.azplan_unbilled_lineitems scheduled__2022-08-02T00:00:00+00:00 map_index=23 [None]>'
[2022-08-04 00:42:13,698] {process_utils.py:125} INFO - Sending Signals.SIGTERM to group 33. PIDs of all processes in the group: [118, 120, 33]
[2022-08-04 00:42:13,698] {process_utils.py:80} INFO - Sending the signal Signals.SIGTERM to group 33
[2022-08-04 00:42:14,155] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
[2022-08-04 00:42:14,161] {process_utils.py:240} INFO - Waiting up to 5 seconds for processes to exit...
[2022-08-04 00:42:14,208] {process_utils.py:75} INFO - Process psutil.Process(pid=33, status='terminated', exitcode=0, started='00:42:09') (33) terminated with exit code 0
[2022-08-04 00:42:14,209] {process_utils.py:75} INFO - Process psutil.Process(pid=118, status='terminated', started='00:42:12') (118) terminated with exit code None
[2022-08-04 00:42:14,209] {process_utils.py:75} INFO - Process psutil.Process(pid=120, status='terminated', started='00:42:13') (120) terminated with exit code None
[2022-08-04 00:42:14,210] {scheduler_job.py:780} INFO - Exited execute loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
pyodbc.IntegrityError: ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 751, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 839, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 921, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1163, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 524, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 654, in task_instance_scheduling_decisions
    schedulable_tis, changed_tis, expansion_happened = self._get_ready_tis(
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/dagrun.py", line 710, in _get_ready_tis
    expanded_tis, _ = schedulable.task.expand_mapped_task(self.run_id, session=session)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/mappedoperator.py", line 683, in expand_mapped_task
    session.flush()
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3484, in _flush
    with util.safe_reraise():
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    _emit_update_statements(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
    c = connection._execute_20(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]PRIMARY KEY 제약 조건 'task_instance_pkey'을(를) 위반했습니다. 개체 'dbo.task_instance'에 중복 키를 삽입할 수 없습니다. 중복 키 값은 (az_partner_etl_usage, azplan_unbilled_lineitems, scheduled__2022-08-02T00:00:00+00:00, 0)입니다. (2627) (SQLExecDirectW)")
[SQL: UPDATE task_instance SET map_index=? WHERE task_instance.task_id = ? AND task_instance.dag_id = ? AND task_instance.run_id = ? AND task_instance.map_index = ?]
[parameters: (0, 'azplan_unbilled_lineitems', 'az_partner_etl_usage', 'scheduled__2022-08-02T00:00:00+00:00', -1)]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

@ephraimbuddy
Copy link
Contributor

It seems like the merged PR(#25532) only solved scheduler crashes which is part of this issue but not the real issue. I feel we should still keep it open. WDYT? cc: @potiuk

@potiuk
Copy link
Member

potiuk commented Aug 10, 2022

I think this issue is about "crash" which was the "strong" manifestation of the problem and did not let us see all the important details of the actual issue, but rather than re-openinig this one - I'd open another one solely to handle the duplicate task instance one if we can get hold of more information and logs on this one.

@potiuk
Copy link
Member

potiuk commented Aug 10, 2022

Mostly because when we solve the crash, we might have more data/information about the underlying problem (they are a bit clouded now and it's even likely there are few different reasons that triggered this crash).

@uranusjr
Copy link
Member

We have another issue opened for the data integrity problem (#25200), we can track things there.

@Sangameshmsopen
Copy link

Hello all,
We have observed that in Airflow scheduler heartbeat has stopped and all the DAGs were in queued or running state. We have checked scheduler logs and got the below error. May i know what might be the cause for this.

Details:
We are running Airflow in Google cloud Composer service with below versions
composer-2.1.4
airflow-2.4.3

Error:
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"

@potiuk
Copy link
Member

potiuk commented Feb 20, 2023

Detailed logs and circumstance would be needed (see #25200 for example how detailed information helped in fixing similar issue).

@Sangameshmsopen
Copy link

Sangameshmsopen commented Feb 20, 2023

@potiuk

Sure i'm providing the details. Same issue has occurred even in one of our testing environment.

Apache airflow version:
2.4.3
Deployed in Google cloud composer:
2.1.4

What happened:

We have one scheduling DAG which will trigger at 12:00 AM. In that DAG we have main task and sub tasks as well.
Sub tasks will be created dynamically based on few arguments.

When Sub task (which has created dynamically) starts in DAG, I can see instance details as null (means no instance has created for that task. Please refer screenshot 1). So i don't get any logs for that task.

Screenshot 1:
Screenshot 2023-02-20 at 3 38 47 PM

But when i checked the logs in composer service. I can see the error log which has occurred under scheduler and time is almost near to stopping of scheduler heart beat. (Please refer screenshot 2)

Screenshot 2:
Screenshot 2023-02-20 at 3 48 14 PM

Need clarification regarding this issue.

  1. Is there any chances that scheduler heart beat stops because of above issue?
  2. What are all the other scenarios where scheduler heart beat can stop?

Please let me know if any other details are required.

@potiuk
Copy link
Member

potiuk commented Feb 20, 2023

@uranusjr - wasn't the "null" mapped task instance fixed since 2.4.3 ? I cannot find it easily

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants