Skip to content

Commit

Permalink
Fix tiered compaction k-merge bug and use in-memory alternative (#7661)
Browse files Browse the repository at this point in the history
This PR does two things:

First, it fixes a bug with tiered compaction's k-merge implementation.
It ignored the lsn of a key during ordering, so multiple updates of the
same key could be read in arbitrary order, say from different layers.
For example there is layers `[(a, 2),(b, 3)]` and `[(a, 1),(c, 2)]` in
the heap, they might return `(a,2)` and `(a,1)`.

Ultimately, this change wasn't enough to fix the ordering issues in
#7296, in other words there is likely still bugs in the k-merge. So as
the second thing, we switch away from the k-merge to an in-memory based
one, similar to #4839, but leave the code around to be improved and
maybe switched to later on.

Part of #7296
  • Loading branch information
arpad-m committed May 9, 2024
1 parent 107f535 commit 41fb838
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 5 deletions.
9 changes: 7 additions & 2 deletions pageserver/compaction/src/compact_tiered.rs
Expand Up @@ -24,7 +24,9 @@ use tracing::{debug, info};
use std::collections::{HashSet, VecDeque};
use std::ops::Range;

use crate::helpers::{accum_key_values, keyspace_total_size, merge_delta_keys, overlaps_with};
use crate::helpers::{
accum_key_values, keyspace_total_size, merge_delta_keys_buffered, overlaps_with,
};
use crate::interface::*;
use utils::lsn::Lsn;

Expand Down Expand Up @@ -535,7 +537,10 @@ where
}
}
// Open stream
let key_value_stream = std::pin::pin!(merge_delta_keys::<E>(deltas.as_slice(), ctx));
let key_value_stream =
std::pin::pin!(merge_delta_keys_buffered::<E>(deltas.as_slice(), ctx)
.await?
.map(Result::<_, anyhow::Error>::Ok));
let mut new_jobs = Vec::new();

// Slide a window through the keyspace
Expand Down
30 changes: 27 additions & 3 deletions pageserver/compaction/src/helpers.rs
Expand Up @@ -14,6 +14,7 @@ use std::future::Future;
use std::ops::{DerefMut, Range};
use std::pin::Pin;
use std::task::{ready, Poll};
use utils::lsn::Lsn;

pub fn keyspace_total_size<K>(
keyspace: &CompactionKeySpace<K>,
Expand Down Expand Up @@ -109,17 +110,40 @@ pub fn merge_delta_keys<'a, E: CompactionJobExecutor>(
}
}

pub async fn merge_delta_keys_buffered<'a, E: CompactionJobExecutor + 'a>(
layers: &'a [E::DeltaLayer],
ctx: &'a E::RequestContext,
) -> anyhow::Result<impl Stream<Item = <E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>>
{
let mut keys = Vec::new();
for l in layers {
// Boxing and casting to LoadFuture is required to obtain the right Sync bound.
// If we do l.load_keys(ctx).await? directly, there is a compilation error.
let load_future: LoadFuture<'a, _> = Box::pin(l.load_keys(ctx));
keys.extend(load_future.await?.into_iter());
}
keys.sort_by_key(|k| (k.key(), k.lsn()));
let stream = futures::stream::iter(keys.into_iter());
Ok(stream)
}

enum LazyLoadLayer<'a, E: CompactionJobExecutor> {
Loaded(VecDeque<<E::DeltaLayer as CompactionDeltaLayer<E>>::DeltaEntry<'a>>),
Unloaded(&'a E::DeltaLayer),
}
impl<'a, E: CompactionJobExecutor> LazyLoadLayer<'a, E> {
fn key(&self) -> E::Key {
fn min_key(&self) -> E::Key {
match self {
Self::Loaded(entries) => entries.front().unwrap().key(),
Self::Unloaded(dl) => dl.key_range().start,
}
}
fn min_lsn(&self) -> Lsn {
match self {
Self::Loaded(entries) => entries.front().unwrap().lsn(),
Self::Unloaded(dl) => dl.lsn_range().start,
}
}
}
impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Expand All @@ -129,12 +153,12 @@ impl<'a, E: CompactionJobExecutor> PartialOrd for LazyLoadLayer<'a, E> {
impl<'a, E: CompactionJobExecutor> Ord for LazyLoadLayer<'a, E> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// reverse order so that we get a min-heap
other.key().cmp(&self.key())
(other.min_key(), other.min_lsn()).cmp(&(self.min_key(), self.min_lsn()))
}
}
impl<'a, E: CompactionJobExecutor> PartialEq for LazyLoadLayer<'a, E> {
fn eq(&self, other: &Self) -> bool {
self.key().eq(&other.key())
self.cmp(other) == std::cmp::Ordering::Equal
}
}
impl<'a, E: CompactionJobExecutor> Eq for LazyLoadLayer<'a, E> {}
Expand Down

1 comment on commit 41fb838

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3105 tests run: 2959 passed, 0 failed, 146 skipped (full report)


Flaky tests (2)

Postgres 16

  • test_gc_aggressive: debug

Postgres 14

  • test_partial_evict_tenant[relative_spare]: release

Code coverage* (full report)

  • functions: 31.4% (6321 of 20132 functions)
  • lines: 47.3% (47632 of 100707 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
41fb838 at 2024-05-09T15:25:42.825Z :recycle:

Please sign in to comment.