Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: wire up new reclock implementation #27058

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented May 13, 2024

Motivation

With the new reclock implementation now merged, this PR integrates it into the ingestion pipeline and deletes the old one

Tips for reviewer

Checklist

@petrosagg petrosagg requested a review from a team as a code owner May 13, 2024 13:17
@petrosagg petrosagg requested a review from guswynn May 13, 2024 13:17
Copy link
Contributor

@guswynn guswynn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally happy with this (look at the minus red!!), a few nits!

Its unclear what the ci issues are...resumption frontier issues?

@@ -35,7 +35,6 @@ pub(crate) struct GeneralSourceMetricDefs {
// Source metrics
pub(crate) capability: UIntGaugeVec,
pub(crate) resume_upper: IntGaugeVec,
pub(crate) inmemory_remap_bindings: UIntGaugeVec,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still have some metrics we can keep track of right?

deferred_source_bindings and the remap trace lenths seem nice?

};
((output, result), from_time, diff)
})
.capture_into(PusherCapture(reclock_pusher));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

soooo much nicer!

Comment on lines +175 to +187
let resume_upper = if PartialOrder::less_equal(upper, &as_of) {
Antichain::from_elem(Timestamp::minimum())
} else {
let idx = remap_updates.partition_point(|(_, t, _)| !upper.less_than(t));
source_upper.clear();
source_upper.update_iter(
remap_updates[0..idx]
.iter()
.map(|(from_time, _, diff)| (from_time.clone(), *diff)),
);
source_upper.frontier().to_owned()
};
source_resume_uppers.insert(*id, resume_upper);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know its basic now, but can this be abstracted into a function in the mz_storage::reclock module?

@petrosagg petrosagg marked this pull request as draft May 19, 2024 18:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants