New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add batch fetching of data version records #21798
Conversation
75b0afe
to
fa64bb7
Compare
python_modules/dagster/dagster/_core/execution/context/data_version_cache.py
Show resolved
Hide resolved
af3e771
to
009f3b5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. All very reasonable and clean.
Req'ing changes to drive discussion on configurability piece.
|
||
|
||
if TYPE_CHECKING: | ||
from dagster._core.execution.context.compute import StepExecutionContext | ||
|
||
ASSET_RECORD_BATCH_SIZE = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we can make this configurable and driven from a source of truth in Dagster Plus when the user is a Plus user? It would be great to have control over this to tune perf and control incidents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main way to do this (since this called from user code) is to make it a property of the instance.
something like:
# in DagterInstance
@property
def max_recommended_batch_size() -> int:
...
and then the question is how narrowly we scope it. Is it for fetching events? all records?
I guess the thing to do for this particular case is to have the recommended batch size set and then warn in OSS if it exceeds it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense. @gibsondan do you have a position on doing this approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the limit here should probably be tuned to this specific query callsite (as opposed to perscribing the max amount of asset records you would want to fetch in any situation - it probably depends on the context)
I think it depends on whether we want to be able to tune it remotely on the cloud servers without the user taking any action.
If we do, a simple option would be to make it an env var - this is how we handled it for batch writing store events, and there are various options for both users and for us to do remote tuning of env var values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(if we do this we should just be sure to re-fetch the env var from os.environ every time it is accessed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this true even though this code is run in user-code (e.g. run containers)? Edit: I guess that makes sense if we use it for store event calls
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I DMed daniel this exact question :-)
python_modules/dagster/dagster/_core/execution/context/data_version_cache.py
Outdated
Show resolved
Hide resolved
…#21809) ## Summary & Motivation The current implementation of `get_latest_data_version_record` would always fetch the latest observation AND the latest materialization. It would then return one or the other, preferring the materialization in the ambiguous case. This PR short-circuits the ambiguous case such that we only fetch the observation when the materialization is not present. This effectively halves (from 2 calls => 1 call) the round-trips to storage in the most common case (materializable assets). Additionally, this PR replaces calls to the generic `get_event_records` API with the narrower `fetch_X` APIs. This PR is orthogonal to #21798, which replaces calls to `get_latest_data_version_record` with calls to the batchable `get_asset_records` in the simple case where storage_id filters and partition filters are not applied. Should note: this does not change the logic of which record is returned. The API name is a little misleading because we will return the latest materialization record even if the observation record is more recent for materializable assets with both types of records. ## How I Tested These Changes BK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool great
Summary & Motivation
Asset graphs with large fan-in can incur a hefty data-fetching cost when used with data versions. This PR fetches the asset record for a batched set of asset keys. The asset record has the last materialization record, and potentially the last observation record (in Plus), reducing the number of serial fetches we have to make to get the input data versions.
This batching of calls is only possible because we're not filtering the records (obs/mats) that we're fetching (either by partition or by storage id).
How I Tested These Changes
Added an explicit fan-in data version test that checks the underlying data fetching calls. It went from 200 calls to
get_event_records
=> 1 call ofget_asset_records
.