Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokio based walredo #2875

Closed
wants to merge 12 commits into from
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions libs/pageserver_api/src/reltag.rs
Expand Up @@ -32,6 +32,11 @@ pub struct RelTag {
pub relnode: Oid,
}

impl RelTag {
/// Serialized length.
pub const LEN: u32 = 1 + 4 + 4 + 4;
}

impl PartialOrd for RelTag {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
Expand Down
1 change: 1 addition & 0 deletions pageserver/Cargo.toml
Expand Up @@ -25,6 +25,7 @@ const_format = "0.2.21"
crc32c = "0.6.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
flume = "0.10.14"
futures = "0.3.13"
git-version = "0.3.5"
hex = "0.4.3"
Expand Down
128 changes: 84 additions & 44 deletions pageserver/benches/bench_walredo.rs
Expand Up @@ -30,33 +30,48 @@ fn redo_scenarios(c: &mut Criterion) {
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_id = TenantId::generate();
// std::fs::create_dir_all(conf.tenant_path(&tenant_id)).unwrap();
let mut manager = PostgresRedoManager::new(conf, tenant_id);
manager.launch_process(14).unwrap();

let manager =
PostgresRedoManager::multiprocess(conf, tenant_id, std::num::NonZeroUsize::new(4).unwrap());

let manager = Arc::new(manager);

tracing::info!("executing first");
short().execute(&manager).unwrap();
tracing::info!("first executed");

let thread_counts = [1, 2, 4, 8, 16];

let mut group = c.benchmark_group("short");
group.sampling_mode(criterion::SamplingMode::Flat);

for thread_count in thread_counts {
c.bench_with_input(
BenchmarkId::new("short-50record", thread_count),
group.bench_with_input(
BenchmarkId::new("short", thread_count),
&thread_count,
|b, thread_count| {
add_multithreaded_walredo_requesters(b, *thread_count, &manager, short, 50);
add_multithreaded_walredo_requesters(b, *thread_count, &manager, short);
},
);
}
drop(group);

let mut group = c.benchmark_group("medium");
group.sampling_mode(criterion::SamplingMode::Flat);

for thread_count in thread_counts {
c.bench_with_input(
BenchmarkId::new("medium-10record", thread_count),
group.bench_with_input(
BenchmarkId::new("medium", thread_count),
&thread_count,
|b, thread_count| {
add_multithreaded_walredo_requesters(b, *thread_count, &manager, medium, 10);
add_multithreaded_walredo_requesters(b, *thread_count, &manager, medium);
},
);
}
drop(group);

let fut = pageserver::task_mgr::shutdown_tasks(None, Some(tenant_id), None);
pageserver::task_mgr::WALREDO_RUNTIME.block_on(fut);
}

/// Sets up `threads` number of requesters to `request_redo`, with the given input.
Expand All @@ -65,46 +80,66 @@ fn add_multithreaded_walredo_requesters(
threads: u32,
manager: &Arc<PostgresRedoManager>,
input_factory: fn() -> Request,
request_repeats: usize,
) {
b.iter_batched_ref(
|| {
// barrier for all of the threads, and the benchmarked thread
let barrier = Arc::new(Barrier::new(threads as usize + 1));

let jhs = (0..threads)
.map(|_| {
std::thread::spawn({
let manager = manager.clone();
let barrier = barrier.clone();
move || {
let input = std::iter::repeat(input_factory())
.take(request_repeats)
.collect::<Vec<_>>();

barrier.wait();

execute_all(input, &*manager).unwrap();

barrier.wait();
assert_ne!(threads, 0);

if threads == 1 {
b.iter_batched_ref(
|| Some(input_factory()),
|input| execute_all(input.take(), &*manager),
criterion::BatchSize::PerIteration,
);
} else {
let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize);

let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx));

let barrier = Arc::new(Barrier::new(threads as usize + 1));

let jhs = (0..threads)
.map(|_| {
std::thread::spawn({
let manager = manager.clone();
let barrier = barrier.clone();
let work_rx = work_rx.clone();
move || loop {
// queue up and wait if we want to go another round
if work_rx.lock().unwrap().recv().is_err() {
break;
}
})

let input = Some(input_factory());

barrier.wait();

execute_all(input, &*manager).unwrap();

barrier.wait();
}
})
.collect::<Vec<_>>();
})
.collect::<Vec<_>>();

(barrier, JoinOnDrop(jhs))
},
|input| {
let barrier = &input.0;
let _jhs = JoinOnDrop(jhs);

b.iter_batched(
|| {
for _ in 0..threads {
work_tx.send(()).unwrap()
}
},
|()| {
// start the work
barrier.wait();

// start the work
barrier.wait();
// wait for work to complete
barrier.wait();
},
criterion::BatchSize::PerIteration,
);

// wait for work to complete
barrier.wait();
},
criterion::BatchSize::PerIteration,
);
drop(work_tx);
}
}

struct JoinOnDrop(Vec<std::thread::JoinHandle<()>>);
Expand All @@ -121,7 +156,10 @@ impl Drop for JoinOnDrop {
}
}

fn execute_all(input: Vec<Request>, manager: &PostgresRedoManager) -> Result<(), WalRedoError> {
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
where
I: IntoIterator<Item = Request>,
{
// just fire all requests as fast as possible
input.into_iter().try_for_each(|req| {
let page = req.execute(manager)?;
Expand All @@ -143,6 +181,7 @@ macro_rules! lsn {
}};
}

/// Short payload, 1132 bytes.
// pg_records are copypasted from log, where they are put with Debug impl of Bytes, which uses \0
// for null bytes.
#[allow(clippy::octal_escapes)]
Expand Down Expand Up @@ -172,6 +211,7 @@ fn short() -> Request {
}
}

/// Medium sized payload, serializes as 26393 bytes.
// see [`short`]
#[allow(clippy::octal_escapes)]
fn medium() -> Request {
Expand Down
Binary file added pageserver/fixtures/short_v14_redo.page
Binary file not shown.
9 changes: 0 additions & 9 deletions pageserver/src/metrics.rs
Expand Up @@ -316,15 +316,6 @@ pub static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});

pub static WAL_REDO_WAIT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_wait_seconds",
"Time spent waiting for access to the WAL redo process",
redo_histogram_time_buckets!(),
)
.expect("failed to define a metric")
});

pub static WAL_REDO_RECORDS_HISTOGRAM: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_records_histogram",
Expand Down