-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-3871] render Operators template fields recursively #4743
[AIRFLOW-3871] render Operators template fields recursively #4743
Conversation
0bf4ac0
to
a37993b
Compare
Codecov Report
@@ Coverage Diff @@
## master #4743 +/- ##
========================================
Coverage ? 9.47%
========================================
Files ? 607
Lines ? 35032
Branches ? 0
========================================
Hits ? 3321
Misses ? 31711
Partials ? 0
Continue to review full report at Codecov.
|
@Fokko : I could not decide between :
(see https://issues.apache.org/jira/browse/AIRFLOW-3871) |
airflow/models/__init__.py
Outdated
setattr(content, | ||
attribute, | ||
self.render_template(attribute, value, context)) | ||
except Exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would strongly prefer more granular error handling here. Specifically, any exception raised from render_template
should be propagated out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was a best effort approach: try to render inner attributes as templates, or else keep attributes unchanged.
e.g: when traversing a UUID
attribute, render_template
succeeds, but setattr
raises an exception (UUID
is immutable)
But you have a good point: if my inner template expression is wrong, I would like the exception to be propagated out, so I can understand the issue.
I could just put the render_template
call outside try statement?
I will add some test cases right away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that would be acceptable. I'd also consider logging any exception at least as a warning.
@galak75 Btw., not sure if you've seen this, but I've also commented on the ticket regarding the general approach. I'll repeat here for reference:
|
122f763
to
24b0004
Compare
@bjoernpollex-sc : thanks a lot for your feedback I have some questions about it:
This is a good point. In such a case, shouldn't Airflow templating process be preferred?
I'm pretty new with python, how this could happen? Isn't introspection meant to provide all existing attributes, methods, and so on, on an object? Do you have any example or reading about it?
It is a good alternative to avoid the first point: just choose the fields we want to be templated. One thing I really appreciate with approach # 3 is that is works without any change on classes. just setting a templated value on any attribute from any class (in an operator template_field), and this value will be rendered during DAG execution. If we have to customize a class so that its inner fields are templated, would you rather add a I hope I'm clear enough... Thank you in advance for your answer. |
@galak75 Good points! Responses below:
Let me know if you have any more questions, and thanks for putting in the work here! |
@bjoernpollex-sc: I totally agree with your statement below
I have another question though: coming back to the code example in the JIRA description: Let say that
Would it work? |
after working on it, it looks like it works (see additional unit tests). @bjoernpollex-sc: can you please review my latest changes? |
airflow/models/__init__.py
Outdated
@@ -2181,6 +2181,7 @@ def __init__( | |||
# Private attributes | |||
self._upstream_task_ids = set() | |||
self._downstream_task_ids = set() | |||
self._rendered_template_object_ids = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally I'd like for this to not be a member. This is only used once, during template rendering, there's no need for having it here. The template rendering method could pass it along as parameter. This is not a big issue, it just seems cleaner to me to limit data to the scope where it's actually needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather keep it as a member to avoid changing BaseOperator.render_template
and BaseOperator.render_template_from_field
both signatures.
See _render_template_object_recursively
implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a fair point.
@galak75 Thanks, this looks very good. The implementation seems fine to me, and the tests seem to cover all the cases. I've left one comment, but I'm not sure if it really needs to be addressed. Your example of a class MyAwesomeDataFileTransformer:
def __init__(self, source, target):
self.source = Path(source)
self.target = Path(target) Now, because the paths are wrapped internally in a |
@bjoernpollex-sc, thank you for your feedback. the statement below works fine: p = pathlib.Path('/tmp/{{ dag.dag_id }}') Then, I have an idea to solve the limitation you described: why not add an additional Both templating hooks could be used together to provide even more flexibility. What do you think about it? |
I'm leaning towards a -1 on this - it seems that there are going to be many more case where this won't Just Work leading to confusion on behalf of users. Can you give a concrete example of when you'd need this? In examples you've shown above why couldn't you create the |
@ashb Suppose you have a transform = GCSFileTransformOperator(
source="some/path/{{ds}}/file.xls",
target="other/path/{{ds}}/file.csv",
transform=ExcelToCSV,
transform_args={"quote": True, separator: ","}
) This works, but what if the operator has multiple such components as inputs? Consider the common file-system interfaces proposed in AIP-14. With those, we can make the operator more generic: transform = FileTransformOperator(
source=GCSFileSource,
source_args={"path": "some/path/{{ds}}/file.xls", "bucket": "some_gcs_bucket", "conn_id": "gcs_conn"),
target=SFTPFileTarget,
target_args={"path": "other/path/{{ds}}/file.csv", "conn_id": "sft_conn", "retries": 3},
transform=ExcelToCSV,
transform_args={"quote": True, "separator": ","}
) This starts becoming impractical, it would be much simpler to just write: transform = FileTransformOperator(
source=GCSFileSource(path="some/path/{{ds}}/file.xls", bucket="some_gcs_bucket", conn_id="gcs_conn"),
target=SFTPFileTarget(path="other/path/{{ds}}/file.csv", conn_id="sft_conn", retries=3),
transform=ExcelToCSV(quote=True, separator=",")
) This is essentially my use-case in a nutshell. Regarding your concern that this might lead to confusion - that's why I argued for an approach that requires explicit declaration of template fields in all objects/classes that support templates, because that leads to the least surprises (it's explicit, and it follows the current convention for operators). This can easily be added to existing classes, either by monkey-patching or sub-classes, so it's also quite flexible. |
Sure @ashb. Here is the real use case: We have a lot of different data sources to be imported, transformed and then exported to several destinations. So we tried to decouple our business logic (importing data, transforming it, and then exporting it) from airflow callable functions: First, we defined a template method to be used as a def process_data(dataImporter, dataTransformer, dataExporter):
data = dataImporter.import_data()
data = dataTransformer.transform_data(data)
dataExporter.export_data(data) Then we just have to "inject" the proper task1 = PythonOperator(
task_id='task_1',
python_callable=process_data,
op_args=[
SomeFileDataImporter('/tmp/{{ ds }}/input_data'),
SomeDataTransformer(some_value, 'path/to/other/file/{{ ds }}/file'),
SomeFileDataExporter('/data/output/{{ dag.dag_id }}/output_file')
],
dag=dag
)
task2 = PythonOperator(
task_id='task_2',
python_callable=process_data,
op_args=[
SomeJoiningCsvFilesDataImporter(
'/tmp/{{ ds }}/input_data_1',
'/tmp/{{ ds }}/input_data_2',
join_on='id'
),
SomeOtherDataTransformer(some_value),
SomeOtherDataExporter(execution_ts={{ ts }})
],
dag=dag
) This approach has several benefits:
To make this approach work properly, we would like operators to be able to render nested template fields. The I also suggested adding another Thank you for taking time to read this |
Hello @ashb |
d173a12
to
0246619
Compare
Hi everyone I do not want to be rude. I'm sure everyone has a lot to do, especially on this awesome product which is Airflow! But some constructive feedback on this PR (as well on the related issue) would be tremendously appreciated... About the need: Supporting nested template fields rendering in Operators would be really helpful to better design our DAGs. And it could also probably help other users. Of course, the Airflow team might have some good reason to decide this feature won't be supported. In such a case, we could first have a discussion about the need, and then have a clear explanation why this feature could not be supported. About the PR: IMHO, the solution reached with @bjoernpollex-sc 's help is solving most of the use cases. Of course, there might be a better solution. Any thoughts or suggestions to improve this PR would be welcome, and I would be glad to rework it if we can find a consensus. My intent is to help improving Airflow. I'm glad this tool exists and I'm glad I can use it. |
0246619
to
80451c1
Compare
airflow/models/baseoperator.py
Outdated
return result | ||
|
||
def _render_nested_template_fields(self, content, context): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since _rendered_template_object_ids is only relevant to this function can we instead change the sig to:
def _render_nested_template_fields(self, content, context): | |
def _render_nested_template_fields(self, content, context, seen_oids): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashb : I have some concern about this change; I thought about it, but it implies changing BaseOperator.render_template
public method signature to propagate theses already seen_oids
to nested template fields:
def render_template(self, attr, content, context):
would become:
def render_template(self, attr, content, context, seen_oids):
This is why I preferred storing this set as an instance variable.
If you think this is not an issue, just let me know and I'll do it.
(see #4743 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this diff should work:
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index b71f3c4ca1..3e081a5e3d 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -632,7 +633,10 @@ def render_template_from_field(self, attr, content, context, jinja_env):
Renders a template from a field. If the field is a string, it will
simply render the string and return the result. If it is a collection or
nested set of collections, it will traverse the structure and render
- all elements in it. If the field has another type, it will return it as it is.
+ all elements in it. For any other type, it will recursively render attributes
+ listed in its 'template_fields' attribute (class or instance level attribute)
+ when this 'template_fields' is defined only.
+ Finally returns the rendered field.
"""
rt = self.render_template
if isinstance(content, six.string_types):
@@ -644,9 +648,23 @@ def render_template_from_field(self, attr, content, context, jinja_env):
k: rt("{}[{}]".format(attr, k), v, context)
for k, v in list(content.items())}
else:
- result = content
+ result = self._render_nested_template_fields(content, context, seen_oids=set())
return result
+ def _render_nested_template_fields(self, content, context, seen_oids):
+ if id(content) not in seen_oids:
+ seen_oids.add(id(content))
+ try:
+ nested_template_fields = content.template_fields
+ except AttributeError:
+ # content has no inner template fields
+ return content
+
+ for field in nested_template_fields:
+ rendered = self.render_template(field, getattr(content, field), context, seen_oids=seen_oids)
+ setattr(content, field, rendered)
+ return content
+
def render_template(self, attr, content, context):
"""
Renders a template either from a file or directly in a field, and returns
But I may have missed something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ashb,
I might be wrong, but I think there is one change missing in your diff: render_template
signature has to be changed:
for field in nested_template_fields:
rendered = self.render_template(field, getattr(content, field), context, seen_oids=seen_oids)
setattr(content, field, rendered)
return content
- def render_template(self, attr, content, context):
+ def render_template(self, attr, content, context, seen_oids):
"""
Renders a template either from a file or directly in a field, and returns
And this is precisily the change I'm not sure it 's appropriate. What do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, I did miss something.
for field in nested_template_fields:
rendered = self.render_template(field, getattr(content, field), context, seen_oids=seen_oids)
setattr(content, field, rendered)
return content
- def render_template(self, attr, content, context):
+ def render_template(self, attr, content, context, seen_oids=None):
"""
Renders a template either from a file or directly in a field, and returns
And then something like this.
k: rt("{}[{}]".format(attr, k), v, context)
for k, v in list(content.items())}
else:
- result = content
+ if seen_oids is None:
+ seen_oids = set()
+ result = self._render_nested_template_fields(content, context, seen_oids=seen_oids)
return result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your feedback.
I understand you would rather change BaseOperator.render_template
signature. I will rework this as soon as I can find some time for this (hopefully soon enough!).
airflow/models/baseoperator.py
Outdated
@@ -644,9 +648,23 @@ def render_template_from_field(self, attr, content, context, jinja_env): | |||
k: rt("{}[{}]".format(attr, k), v, context) | |||
for k, v in list(content.items())} | |||
else: | |||
result = content | |||
result = self._render_nested_template_fields(content, context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result = self._render_nested_template_fields(content, context) | |
result = self._render_nested_template_fields(content, context, set()) |
(see next comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor changes to the code.
Sorry it sat un-revewied for so long @galak75
This needs adding to the docs/ tree somewhere - probably in https://airflow.apache.org/concepts.html#id1
I missed your first ping, but please @ashb
me once you've made these changes. (Oh, though I'm not around for most of May.)
airflow/models/baseoperator.py
Outdated
return result | ||
|
||
def _render_nested_template_fields(self, content, context): | ||
if id(content) not in self._rendered_template_object_ids: | ||
self._rendered_template_object_ids.add(id(content)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._rendered_template_object_ids.add(id(content)) | |
seen_oids.add(id(content)) |
airflow/models/baseoperator.py
Outdated
return result | ||
|
||
def _render_nested_template_fields(self, content, context): | ||
if id(content) not in self._rendered_template_object_ids: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if id(content) not in self._rendered_template_object_ids: | |
if id(content) not in seen_oids: |
airflow/models/baseoperator.py
Outdated
return content | ||
|
||
for field in nested_template_fields: | ||
rendered = self.render_template(field, getattr(content, field), context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rendered = self.render_template(field, getattr(content, field), context) | |
rendered = self.render_template(field, getattr(content, field), context, seen_oids) |
@@ -85,6 +87,10 @@ def setUp(self): | |||
self.addCleanup(self.dag.clear) | |||
self.clear_run() | |||
self.addCleanup(self.clear_run) | |||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor point, but since we are making other changes: could you move this to tearDown as it is cleanup code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@ashb : thank you for the review. I'll reworked it pretty soon. |
Hi @ashb
Thank you |
@galak75 Thanks again for the work, hoping to see this in soon! I was wondering though, how does this work with the |
Hi @bjoernpollex-sc I will try to take a look at this view soon. |
Hi @ashb |
@ashb : ready to be reviewed again!
|
This is a bit awkward: build is failing... |
Seems there is a flaky build for python 3.5 introduced recently (fix is on
its way). It sometimes passes/sometimes not. I restarted the failed jobs
just in case it might fix it.
J.
…On Tue, Aug 27, 2019 at 9:34 PM Géraud ***@***.***> wrote:
This is a bit awkward: build is failing...
🤔
It succeeded on my fork though... (
https://travis-ci.org/VilledeMontreal/incubator-airflow/builds/577525614)
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#4743?email_source=notifications&email_token=AAERMI6B2EABVPMQPLFUGCLQGXIZXA5CNFSM4GYX4UI2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD5JTKWQ#issuecomment-525546842>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAERMI2Z4ARLD4LX5NVKQ53QGXIZXANCNFSM4GYX4UIQ>
.
--
Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer
M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>
|
7744a10
to
43fbd02
Compare
@ashb : any other comments on this PR ? I think it is ready to be merged |
@ashb 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good @galak75!
It's conflicting with master now though :( Can you ping me when you get a chance to resolve it?
[AIRFLOW-3871] test nested template fields and fix their rendering [AIRFLOW-3871] test deep nested template fields [AIRFLOW-3871] test template rendering on circular references [AIRFLOW-3871] remove unused attribute [AIRFLOW-3871] add docstring [AIRFLOW-3871] remove tests now covered in TestBaseOperator class [AIRFLOW-3871] move test case on missing nested template field [AIRFLOW-3871] move test case on jinja invalid expression [AIRFLOW-3871] cleanup PythonOperator tests [AIRFLOW-3871] add test case with None value on nested template field [AIRFLOW-3871] add test cases with None value and empty collections [AIRFLOW-3871] refactor nested field rendering [AIRFLOW-3871] extract function to actually render template fields [AIRFLOW-3871] reuse extracted function for top fields rendering [AIRFLOW-3871] format code [AIRFLOW-3871] add typing definition to private method [AIRFLOW-3871] nested template field rendering now returns nothing [AIRFLOW-3871] nested template field rendering now returns nothing
43fbd02
to
8781b1e
Compare
@ashb: I rebased onto master and build is green |
Sorry it took so many months to get this in, and thanks for sticking with us! |
Sometimes, things don't go as smoothly as we want... |
Same here, this is really awesome, can't wait for the next release! |
… objects (apache#4743) (cherry picked from commit d567f9a)
Make sure you have checked all steps below.
Jira
Description
see AIRFLOW-3871 Jira ticket description
Tests
4 additional unit tests in
test_python_operator.py
Commits
Documentation
Code Quality
flake8