Skip to content

Commit

Permalink
Listener: Set task on sqlalchemy taskinstance object (#27167)
Browse files Browse the repository at this point in the history
same as #21157

TaskListener API's contract promises to pass TaskInstance object to listener plugin. However, what happens is not 100% true - the object being passed is one that maps to current SQLAlchemy session.

`_run_raw_task` before merging the TI operates on detached TaskInstance object, then merges it to current session. Since there is no attached object in the SQLAlchemy identity map, SQLAlchemy creates it, and it's this object that's being passed to the SQLAlchemy event listeners.

The problem with that is that when creating new SQLAlchemy object, SQLAlchemy takes care about setting only database-mapped fields. The ones that are purely on the python side, like task aren't being set on the new object.

This PR manually sets `task` on the new SQLAlchemy object, so that `on_task_instance_success` receives proper TaskInstance with task field set.
  • Loading branch information
kaxil committed Oct 20, 2022
1 parent 447a9bc commit 395ad71
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Expand Up @@ -1522,7 +1522,7 @@ def _run_raw_task(

if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
session.merge(self).task = self.task
if self.state == TaskInstanceState.SUCCESS:
self._register_dataset_changes(session=session)
session.commit()
Expand Down

0 comments on commit 395ad71

Please sign in to comment.