How to define an unpartitioned asset that depends on all upstream partitions #15375
-
For example: I have a daily partitioned asset, and I want to merge all those partitions into a downstream unpartitioned asset which contains all the rows from all partitions. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
When an unpartitioned asset is downstream of a partitioned asset, dagster automatically infers that the unpartitioned asset depends on all upstream partitions. The opposite applies as well when the unpartitioned asset is upstream of a partitioned asset--each downstream partition depends on the upstream unpartitioned asset. You can define this relationship in code as follows: @asset(partitions_def=DailyPartitionsDefinition("2023-01-01"))
def daily_data():
...
@asset
def aggregated_report(context, daily_data):
context.log.info(daily_data) # the default IO manager returns a dictionary of outputs mapped by partition key {"2023-01-01": ..., "2023-01-02": ..., ...}
... Under the hood, dagster applies the @asset(ins={"daily_data": AssetIn(partition_mapping=AllPartitionMapping())})
def aggregated_report(context, daily_data):
... |
Beta Was this translation helpful? Give feedback.
When an unpartitioned asset is downstream of a partitioned asset, dagster automatically infers that the unpartitioned asset depends on all upstream partitions. The opposite applies as well when the unpartitioned asset is upstream of a partitioned asset--each downstream partition depends on the upstream unpartitioned asset.
You can define this relationship in code as follows:
Under the ho…