Skip to content

Commit

Permalink
Fix apply defaults for task decorator (#16085)
Browse files Browse the repository at this point in the history
  • Loading branch information
junnplus committed May 27, 2021
1 parent cdc9f1a commit 9d06ee8
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 0 deletions.
14 changes: 14 additions & 0 deletions airflow/decorators/base.py
Expand Up @@ -160,6 +160,20 @@ def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable):
)
return return_value

def _hook_apply_defaults(self, *args, **kwargs):
if 'python_callable' not in kwargs:
return args, kwargs

python_callable = kwargs['python_callable']
default_args = kwargs.get('default_args') or {}
op_kwargs = kwargs.get('op_kwargs') or {}
f_sig = signature(python_callable)
for arg in f_sig.parameters:
if arg not in op_kwargs and arg in default_args:
op_kwargs[arg] = default_args[arg]
kwargs['op_kwargs'] = op_kwargs
return args, kwargs


T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name

Expand Down
6 changes: 6 additions & 0 deletions airflow/models/baseoperator.py
Expand Up @@ -172,6 +172,12 @@ def apply_defaults(self, *args: Any, **kwargs: Any) -> Any:
if dag_params:
kwargs['params'] = dag_params

if default_args:
kwargs['default_args'] = default_args

if hasattr(self, '_hook_apply_defaults'):
args, kwargs = self._hook_apply_defaults(*args, **kwargs) # pylint: disable=protected-access

result = func(self, *args, **kwargs)

# Here we set upstream task defined by XComArgs passed to template fields of the operator
Expand Down
16 changes: 16 additions & 0 deletions tests/decorators/test_python.py
Expand Up @@ -411,6 +411,22 @@ def do_run():
ret = do_run()
assert ret.operator.owner == 'airflow' # pylint: disable=maybe-no-member

@task_decorator
def test_apply_default_raise(unknow):
return unknow

with pytest.raises(TypeError):
with self.dag:
test_apply_default_raise() # pylint: disable=no-value-for-parameter

@task_decorator
def test_apply_default(owner):
return owner

with self.dag:
ret = test_apply_default() # pylint: disable=no-value-for-parameter
assert 'owner' in ret.operator.op_kwargs

def test_xcom_arg(self):
"""Tests that returned key in XComArg is returned correctly"""

Expand Down

0 comments on commit 9d06ee8

Please sign in to comment.