Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add output property to MappedOperator #25604

Merged
merged 6 commits into from Aug 28, 2022

Conversation

josh-fell
Copy link
Contributor

@josh-fell josh-fell commented Aug 8, 2022

Currently the output property of operators is only available to unmapped "classic" operators. There can be use cases in which XComs from a set of mapped tasks need to be consumed by a downstream task. Of course, accessing these mapped-task XComs can be done with the typical Jinja template, but having the convenience of using the output property would be a "quality of life" improvement for DAG authors (especially since task dependencies would then be automatically implemented too).

Rather than adding the property to MappedOperator and have this property definition exist in two locations (along with BaseOperator), I opted to add this to AbstractOperator given all tasks have an output even if that output is empty and both BaseOperator and MappedOperator share the AbstractOperator implementation. If there a higher-level reason to have two instances of the property definition, I'm happy to oblige.

This PR adds the output property to MappedOperator and includes a return type annotation for XComArg. Additionally, until a more appropriate API implementation of accessing a different XCom key from output other than "return_value" is decided, the example DAGs and system tests using this API have been updated to explicitly use XComArg(operator, key=...) instead.

TODO:
Add docs demonstrating the use of this property with mapped tasks. May be handled in #25617


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@uranusjr
Copy link
Member

uranusjr commented Aug 9, 2022

TIL about output. Maybe we should promote it more instead of telling the user to import XComArg directly!

@potiuk
Copy link
Member

potiuk commented Aug 9, 2022

TIL about output. Maybe we should promote it more instead of telling the user to import XComArg directly!

Yeah. Promoting more the TaskFlow and outputs is something we should really do.
I think we should start with changing our documentation with reversing the sequence and generally putting a different emphasis:

  • "The right way of wriging tasks (Task Flow and Hooks)" and
  • "The Old and legacy way of doing things (the operators).

@josh-fell josh-fell marked this pull request as ready for review August 9, 2022 16:35
@josh-fell
Copy link
Contributor Author

Yeah slowly trying to change the docs around -- at least to promote TaskFlow API instead of the classic operators. I logged #25319 to update code snippets throughout.

@uranusjr
Copy link
Member

Hmm, the static check failures raise a point for discussion. XComArg has an (unfortunate) API design xcom_arg[key] that can be used to pull an XCom under a different key, but I’m not sure I like it being used with output. task.output["foo"] feel to me more like accessing a dict, not pulling another XCom. The API design can be discussed/changed later, but here I think we should decide whether that is a usage we want to encourage or not (personally I’m against).

@josh-fell
Copy link
Contributor Author

josh-fell commented Aug 15, 2022

Hmm, the static check failures raise a point for discussion. XComArg has an (unfortunate) API design xcom_arg[key] that can be used to pull an XCom under a different key, but I’m not sure I like it being used with output. task.output["foo"] feel to me more like accessing a dict, not pulling another XCom. The API design can be discussed/changed later, but here I think we should decide whether that is a usage we want to encourage or not (personally I’m against).

I have the same icky feeling with that current syntax as well. It's also not functionally equivalent to the classic ti.xcom_pull(task_id=..., key=...)["some_item"] which I opened an issue about last year. There was some discussion on what to change the API to, but there was also a little of "that ship has sailed" vibe as well. Since this is coming up again, it's probably time to raise a discussion about it on the dev list.

In the meantime, WDYT about allowing operator.output(key=...)? It's functionally possible to pass in a key arg to XComArg already, the syntax isn't too bad, and has a less confusing API than the current one. I'm happy to work on this if it's a happy middle ground.

@josh-fell
Copy link
Contributor Author

Side note, the example DAGs were all updated as best as possible to follow the new TaskFlow syntax (see #10285). I have no problem updating these again.

@uranusjr
Copy link
Member

WDYT about allowing operator.output(key=...)?

Doesn’t this conflict with using op.output as a property though? It’s possible but quite difficult to implement output as both an XComArg and callable. Maybe op.get_xcom(key=...) (or something like that) would be easier.

The general direction feels like a good idea to me though. I also plan to add some mechanism in #25661 to avoid exposing the bracket syntax:

  1. Keep XComArg(...) and the return value of @task work as-is (so any existing code depending on the bracket syntax continues to work)
  2. Introduce a new XComArg class that does not provide the bracket syntax, and use that in output and get_xcom() etc.

Does this sound like a good plan?

@josh-fell josh-fell changed the title Add output property to common operator implementation Add output property to MappedOperator Aug 23, 2022
@josh-fell
Copy link
Contributor Author

josh-fell commented Aug 23, 2022

The general direction feels like a good idea to me though. I also plan to add some mechanism in #25661 to avoid exposing the bracket syntax:

  1. Keep XComArg(...) and the return value of @task work as-is (so any existing code depending on the bracket syntax continues to work)
  2. Introduce a new XComArg class that does not provide the bracket syntax, and use that in output and get_xcom() etc.

Does this sound like a good plan?

Sounds like a good one to me.

@josh-fell josh-fell force-pushed the mappedop-output-prop branch 2 times, most recently from f0db611 to ea3c62f Compare August 23, 2022 20:56
Comment on lines +82 to +83
f'echo "The xcom pushed manually is {XComArg(bash_push, key="manually_pushed_value")}" && '
f'echo "The returned_value xcom is {XComArg(bash_push)}" && '
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter one can still use output although using XComArg for both is probably more consistent…?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it might be better to be consistent. There are some other places I need to clean that up to use XComArg() throughout rather than .output for now. Is that cool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although maybe as long as each file is consistent that's fine. .output is still a valid and useful API. I don't think we want to rip it out of all of the examples.

@@ -99,18 +100,20 @@
)
# [END howto_operator_ecs_register_task_definition]

registered_task_definition = cast(str, register_task.output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s probably a good idea to add a Mapy plugin for this… I’ll look into it after this is merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I was going to ask if there was a better way than doing all of these cast() calls. Had to be something we could do with mypy specifically.

@josh-fell
Copy link
Contributor Author

🤞 on CI. I think I finally got everything and new stuff.

@josh-fell
Copy link
Contributor Author

Nice check to make sure providers would still be compatible with Airflow 2.2! Caught some import issues.

@potiuk
Copy link
Member

potiuk commented Aug 26, 2022

Nice check to make sure providers would still be compatible with Airflow 2.2! Caught some import issues.

:D :D :D -> sometimes those are REALLY useful (even if annoying).

Importing `XComArg` directly from the `airflow` namespace wasn't available until 2.3. Providers only have a minimum req of 2.2. The previous import method would have implicitly required a minimum Airflow 2.3 version.
@uranusjr uranusjr merged commit 1ed0146 into apache:main Aug 28, 2022
@josh-fell josh-fell deleted the mappedop-output-prop branch August 28, 2022 18:39
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Sep 13, 2022
@ephraimbuddy ephraimbuddy added this to the Airflow 2.4.0 milestone Sep 14, 2022
sunank200 added a commit to astronomer/astro-sdk that referenced this pull request Dec 12, 2022
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
The output was implemented in 2.4.0 according to the release notes (see
[here](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#id13))

Add output property to MappedOperator
apache/airflow#25604

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1359 


## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->

- Catch exception for airflow version < 2.4.0 and use `XComArg(...)`
instead.


## Does this introduce a breaking change?
No

### Checklist
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README / documentation, if necessary

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
kaxil pushed a commit to astronomer/astro-sdk that referenced this pull request Dec 13, 2022
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
The output was implemented in 2.4.0 according to the release notes (see
[here](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#id13))

Add output property to MappedOperator
apache/airflow#25604

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1359

## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->

- Catch exception for airflow version < 2.4.0 and use `XComArg(...)`
instead.

## Does this introduce a breaking change?
No

### Checklist
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README / documentation, if necessary

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
(cherry picked from commit 0f5ff62)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants