Skip to content

Commit

Permalink
Disable attrs state management on MappedOperator (#24772)
Browse files Browse the repository at this point in the history
(cherry picked from commit 6fd06fa)
  • Loading branch information
uranusjr authored and ephraimbuddy committed Aug 19, 2022
1 parent 22faa65 commit 7914c6c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
12 changes: 11 additions & 1 deletion airflow/models/mappedoperator.py
Expand Up @@ -239,7 +239,17 @@ def _expand(self, **mapped_kwargs: "Mappable") -> "MappedOperator":
return op


@attr.define(kw_only=True)
@attr.define(
kw_only=True,
# Disable custom __getstate__ and __setstate__ generation since it interacts
# badly with Airflow's DAG serialization and pickling. When a mapped task is
# deserialized, subclasses are coerced into MappedOperator, but when it goes
# through DAG pickling, all attributes defined in the subclasses are dropped
# by attrs's custom state management. Since attrs does not do anything too
# special here (the logic is only important for slots=True), we use Python's
# built-in implementation, which works (as proven by good old BaseOperator).
getstate_setstate=False,
)
class MappedOperator(AbstractOperator):
"""Object representing a mapped operator in a DAG."""

Expand Down
16 changes: 16 additions & 0 deletions tests/serialization/test_dag_serialization.py
Expand Up @@ -24,6 +24,7 @@
import json
import multiprocessing
import os
import pickle
from datetime import datetime, timedelta
from glob import glob
from unittest import mock
Expand Down Expand Up @@ -1890,6 +1891,21 @@ def x(arg1, arg2, arg3):
"retry_delay": timedelta(seconds=30),
}

# Ensure the serialized operator can also be correctly pickled, to ensure
# correct interaction between DAG pickling and serialization. This is done
# here so we don't need to duplicate tests between pickled and non-pickled
# DAGs everywhere else.
pickled = pickle.loads(pickle.dumps(deserialized))
assert pickled.mapped_op_kwargs == {
"arg2": {"a": 1, "b": 2},
"arg3": _XComRef("op1", XCOM_RETURN_KEY),
}
assert pickled.partial_kwargs == {
"op_args": [],
"op_kwargs": {"arg1": [1, 2, {"a": "b"}]},
"retry_delay": timedelta(seconds=30),
}


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
@pytest.mark.parametrize(
Expand Down

0 comments on commit 7914c6c

Please sign in to comment.