Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid explicit consolidation in topk rendering #27068

Merged
merged 3 commits into from
May 14, 2024

Conversation

antiguru
Copy link
Member

@antiguru antiguru commented May 13, 2024

Motivation

This PR adds a feature that has not yet been specified.

Change the rendering of topk plans to avoid an intermediate consolidate. At the moment, we render plans by forking the inputs, arranging and reducing once side, then concatenating the inputs with negated reduction output, and consolidating the result. This makes sure that we consolidate eagerly, but at the same time does duplicate work: The next operator forms an arrangement, so we could just reuse that instead.

Ths PR implements this pattern, removing one consolidate from each topk stage, and adding it back after the final stage to ensure the topk output itself is consolidated. Note that we now apply the hash modulus on uncompacted data, whereas it previously was guaranteed to be consolidated. This might increase the cost of the operator by a factor of 2.

Tops to the reviewer

Best viewed with whitespace changes hidden!

Checklist

Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand this. Going forward I submit that more comments would help. This is not new to this PR, and is inherited from before, but it was hard to understand what has happened when no comments exist to either state what needs to be true or to change as we change the implementation.

Comment on lines 365 to 408
let (input, oks, errs) = if validating {
let from = |v: &Result<Row, Row>| v.into_owned();
let (input, stage) =
build_topk_negated_stage::<S, _, _, RowValSpine<Result<Row, Row>, _, _>>(
&input, from, order_key, offset, limit, arity,
);
let stage = stage.as_collection(|k, v| (SharedRow::pack(k), v.clone()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it right that the from movement is tidying, and the only thing going on here is returning the other result returned from build_topk_negated_stage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is to avoid the horrors of cargo fmt, which would spread it over approximately 150 lines otherwise.

@antiguru antiguru marked this pull request as ready for review May 14, 2024 15:30
@antiguru antiguru requested a review from a team as a code owner May 14, 2024 15:30
Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Only thought was around the added consolidation and that perhaps it is optional, but also perhaps we should shake that out at a later date.

Comment on lines +200 to +203
// Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
let result = result.consolidate_named::<KeyBatcher<_, _, _>>(
"Monotonic TopK final consolidate",
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, I think we should revisit "consolidation" and come up with a consistent pattern for introducing it. For example, I'm a supporter of "before re-using a collection" to avoid consolidation that may then feed into an arrangement. I'm not sure we need to perform it before emitting results here, though. Seems harmless, as the net reduction in consolidations is already good in the PR, but .. would love to revisit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I added the defensive consolidate is that I don't know what expectations downstream operators have about the form of the data. Ideally, all operators should function with non-consolidated data, but specifically monotonic implementations do not handle non-consolidated data well. (Should we have a different Diff for monotonic dataflows?)

Copy link
Contributor

@ggevay ggevay May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Monotonic operators have a must_consolidate flag, which informs them whether the input is consolidated.

This is tuned by RelaxMustConsolidate. This does abstract interpretation on the LIR trees, keeping track of whether there was an operation that changed the consolidatedness. You can control its behavior for TopK here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, so the implementation is correct wrt to the physically monotonic interpreter. This is somewhat fragile because reasoning about whether a certain operator is monotonic or not is not simple...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this interpreter has to be kept up-to-date whenever any operator implementation changes. This might indeed be a little error-prone.

Comment on lines +329 to +330
// Consolidate the output of `build_topk_stage` because it's not guaranteed to be.
let oks = oks.consolidate_named::<KeyBatcher<_, _, _>>("TopK final consolidate");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

src/compute/src/render/top_k.rs Outdated Show resolved Hide resolved
We can avoid the explicit consolidation in topk rendering by reusing the
arrangement created in front of the reduction.

Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
@antiguru
Copy link
Member Author

A nightly run does not indicate any regressions: https://buildkite.com/materialize/nightly/builds/7769

@antiguru antiguru enabled auto-merge May 14, 2024 18:46
@antiguru antiguru merged commit ff0d8eb into MaterializeInc:main May 14, 2024
72 of 73 checks passed
@antiguru antiguru deleted the topk_no_consolidate branch May 14, 2024 20:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants