From 3af6579d8e732364028bb35965f487a6c45676d9 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 1 Jul 2022 12:27:18 +0800 Subject: [PATCH] Disable attrs state management on MappedOperator The custom __getstate__ and __setstate__ implementation from attrs interacts badly with Airflow's DAG serialization and pickling. When a mapped task is deserialized, subclasses are coerced to MappedOperator. But when the instances go through DAG pickling, all attributes defined in the subclasses are dropped by attrs's custom state management. Since attrs does not do anything too special here (the logic is only important for slot=True), we can use Python's built-in implementation instead. --- airflow/models/mappedoperator.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py index 21a265e6e904c..e34b1501bc346 100644 --- a/airflow/models/mappedoperator.py +++ b/airflow/models/mappedoperator.py @@ -240,7 +240,17 @@ def _expand(self, **mapped_kwargs: "Mappable") -> "MappedOperator": return op -@attr.define(kw_only=True) +@attr.define( + kw_only=True, + # Disable custom __getstate__ and __setstate__ generation since it interacts + # badly with Airflow's DAG serialization and pickling. When a mapped task is + # deserialized, subclasses are coerced into MappedOperator, but when it goes + # through DAG pickling, all attributes defined in the subclasses are dropped + # by attrs's custom state management. Since attrs does not do anything too + # special here (the logic is only important for slots=True), we use Python's + # built-in implementation, which works (as proven by good old BaseOperator). + getstate_setstate=False, +) class MappedOperator(AbstractOperator): """Object representing a mapped operator in a DAG."""