From 5668888a7e1074a620b3d38f407ecf1aa055b623 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 5 Aug 2022 17:15:31 +0100 Subject: [PATCH] Fix "This Session's transaction has been rolled back" (#25532) Accessing the run_id(self.run_id) on exception leads to error because sessions are invalidated on exception. Here we extract the run_id before handling the exception --- airflow/models/dagrun.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index e902f687a07d1..c3ac204dfe804 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1138,6 +1138,10 @@ def _create_task_instances( :param session: the session to use """ + # Fetch the information we need before handling the exception to avoid + # PendingRollbackError due to the session being invalidated on exception + # see https://github.com/apache/superset/pull/530 + run_id = self.run_id try: if hook_is_noop: session.bulk_insert_mappings(TI, tasks) @@ -1151,7 +1155,7 @@ def _create_task_instances( self.log.info( 'Hit IntegrityError while creating the TIs for %s- %s', dag_id, - self.run_id, + run_id, exc_info=True, ) self.log.info('Doing session rollback.')