diff --git a/Cargo.lock b/Cargo.lock index 12ab6f17aacd..3fac7dd2c097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1404,6 +1404,19 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "pin-project", + "spin 0.9.4", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1565,8 +1578,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -2172,6 +2187,15 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2447,6 +2471,7 @@ dependencies = [ "crossbeam-utils", "etcd_broker", "fail", + "flume", "futures", "git-version", "hex", @@ -4896,6 +4921,7 @@ dependencies = [ "futures-channel", "futures-task", "futures-util", + "getrandom", "hashbrown", "indexmap", "libc", diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 43d38bd9862e..c5117a98bce8 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -32,6 +32,11 @@ pub struct RelTag { pub relnode: Oid, } +impl RelTag { + /// Serialized length. + pub const LEN: u32 = 1 + 4 + 4 + 4; +} + impl PartialOrd for RelTag { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 43d51f90c1ec..0c5d67be7223 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -25,6 +25,7 @@ const_format = "0.2.21" crc32c = "0.6.0" crossbeam-utils = "0.8.5" fail = "0.5.0" +flume = "0.10.14" futures = "0.3.13" git-version = "0.3.5" hex = "0.4.3" diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 85caa565feb6..8dac6f834e47 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -30,33 +30,48 @@ fn redo_scenarios(c: &mut Criterion) { let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); let tenant_id = TenantId::generate(); - // std::fs::create_dir_all(conf.tenant_path(&tenant_id)).unwrap(); - let mut manager = PostgresRedoManager::new(conf, tenant_id); - manager.launch_process(14).unwrap(); + + let manager = + PostgresRedoManager::multiprocess(conf, tenant_id, std::num::NonZeroUsize::new(4).unwrap()); let manager = Arc::new(manager); + tracing::info!("executing first"); + short().execute(&manager).unwrap(); + tracing::info!("first executed"); + let thread_counts = [1, 2, 4, 8, 16]; + let mut group = c.benchmark_group("short"); + group.sampling_mode(criterion::SamplingMode::Flat); + for thread_count in thread_counts { - c.bench_with_input( - BenchmarkId::new("short-50record", thread_count), + group.bench_with_input( + BenchmarkId::new("short", thread_count), &thread_count, |b, thread_count| { - add_multithreaded_walredo_requesters(b, *thread_count, &manager, short, 50); + add_multithreaded_walredo_requesters(b, *thread_count, &manager, short); }, ); } + drop(group); + + let mut group = c.benchmark_group("medium"); + group.sampling_mode(criterion::SamplingMode::Flat); for thread_count in thread_counts { - c.bench_with_input( - BenchmarkId::new("medium-10record", thread_count), + group.bench_with_input( + BenchmarkId::new("medium", thread_count), &thread_count, |b, thread_count| { - add_multithreaded_walredo_requesters(b, *thread_count, &manager, medium, 10); + add_multithreaded_walredo_requesters(b, *thread_count, &manager, medium); }, ); } + drop(group); + + let fut = pageserver::task_mgr::shutdown_tasks(None, Some(tenant_id), None); + pageserver::task_mgr::WALREDO_RUNTIME.block_on(fut); } /// Sets up `threads` number of requesters to `request_redo`, with the given input. @@ -65,46 +80,66 @@ fn add_multithreaded_walredo_requesters( threads: u32, manager: &Arc, input_factory: fn() -> Request, - request_repeats: usize, ) { - b.iter_batched_ref( - || { - // barrier for all of the threads, and the benchmarked thread - let barrier = Arc::new(Barrier::new(threads as usize + 1)); - - let jhs = (0..threads) - .map(|_| { - std::thread::spawn({ - let manager = manager.clone(); - let barrier = barrier.clone(); - move || { - let input = std::iter::repeat(input_factory()) - .take(request_repeats) - .collect::>(); - - barrier.wait(); - - execute_all(input, &*manager).unwrap(); - - barrier.wait(); + assert_ne!(threads, 0); + + if threads == 1 { + b.iter_batched_ref( + || Some(input_factory()), + |input| execute_all(input.take(), &*manager), + criterion::BatchSize::PerIteration, + ); + } else { + let (work_tx, work_rx) = std::sync::mpsc::sync_channel(threads as usize); + + let work_rx = std::sync::Arc::new(std::sync::Mutex::new(work_rx)); + + let barrier = Arc::new(Barrier::new(threads as usize + 1)); + + let jhs = (0..threads) + .map(|_| { + std::thread::spawn({ + let manager = manager.clone(); + let barrier = barrier.clone(); + let work_rx = work_rx.clone(); + move || loop { + // queue up and wait if we want to go another round + if work_rx.lock().unwrap().recv().is_err() { + break; } - }) + + let input = Some(input_factory()); + + barrier.wait(); + + execute_all(input, &*manager).unwrap(); + + barrier.wait(); + } }) - .collect::>(); + }) + .collect::>(); - (barrier, JoinOnDrop(jhs)) - }, - |input| { - let barrier = &input.0; + let _jhs = JoinOnDrop(jhs); + + b.iter_batched( + || { + for _ in 0..threads { + work_tx.send(()).unwrap() + } + }, + |()| { + // start the work + barrier.wait(); - // start the work - barrier.wait(); + // wait for work to complete + barrier.wait(); + }, + criterion::BatchSize::PerIteration, + ); - // wait for work to complete - barrier.wait(); - }, - criterion::BatchSize::PerIteration, - ); + drop(work_tx); + } } struct JoinOnDrop(Vec>); @@ -121,7 +156,10 @@ impl Drop for JoinOnDrop { } } -fn execute_all(input: Vec, manager: &PostgresRedoManager) -> Result<(), WalRedoError> { +fn execute_all(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError> +where + I: IntoIterator, +{ // just fire all requests as fast as possible input.into_iter().try_for_each(|req| { let page = req.execute(manager)?; @@ -143,6 +181,7 @@ macro_rules! lsn { }}; } +/// Short payload, 1132 bytes. // pg_records are copypasted from log, where they are put with Debug impl of Bytes, which uses \0 // for null bytes. #[allow(clippy::octal_escapes)] @@ -172,6 +211,7 @@ fn short() -> Request { } } +/// Medium sized payload, serializes as 26393 bytes. // see [`short`] #[allow(clippy::octal_escapes)] fn medium() -> Request { diff --git a/pageserver/fixtures/short_v14_redo.page b/pageserver/fixtures/short_v14_redo.page new file mode 100644 index 000000000000..9e9c266cadf2 Binary files /dev/null and b/pageserver/fixtures/short_v14_redo.page differ diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 454ff01f0e3f..fe2cc8463570 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -316,15 +316,6 @@ pub static WAL_REDO_TIME: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); -pub static WAL_REDO_WAIT_TIME: Lazy = Lazy::new(|| { - register_histogram!( - "pageserver_wal_redo_wait_seconds", - "Time spent waiting for access to the WAL redo process", - redo_histogram_time_buckets!(), - ) - .expect("failed to define a metric") -}); - pub static WAL_REDO_RECORDS_HISTOGRAM: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wal_redo_records_histogram", diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 036fb14e9b62..63d8fbf0b899 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -12,6 +12,7 @@ use anyhow::{bail, ensure, Context, Result}; use bytes::Buf; use bytes::Bytes; +use futures::stream::FuturesOrdered; use futures::{Stream, StreamExt}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -26,6 +27,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use tokio::pin; use tokio_util::io::StreamReader; use tokio_util::io::SyncIoBridge; @@ -56,6 +58,9 @@ use crate::CheckpointConfig; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; +/// Number of requests to process in parallel, from a single connection +const MAX_INFLIGHT_REQUESTS: usize = 4; + fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream> + '_ { async_stream::try_stream! { loop { @@ -226,6 +231,13 @@ struct PageRequestMetrics { get_db_size: metrics::Histogram, } +pub enum RequestType { + Exists, + Nblocks, + GetPage, + DbSize, +} + impl PageRequestMetrics { fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self { let tenant_id = tenant_id.to_string(); @@ -300,67 +312,100 @@ impl PageServerHandler { let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id); + // + // Main loop to handle the stream of requests + // + // We process multiple requests in parallel, by spawning a new Task for each + // incoming request. + let mut inprogress_requests = FuturesOrdered::new(); loop { - let msg = tokio::select! { + tokio::select! { biased; + // If we were requested to shut down, stop _ = task_mgr::shutdown_watcher() => { - // We were requested to shut down. info!("shutdown request received in page handler"); break; } - msg = pgb.read_message() => { msg } - }; - - let copy_data_bytes = match msg? { - Some(FeMessage::CopyData(bytes)) => bytes, - Some(FeMessage::Terminate) => break, - Some(m) => { - bail!("unexpected message: {m:?} during COPY"); - } - None => break, // client disconnected - }; + // When a task completes, send the response to the client + completed_task = inprogress_requests.next(), if !inprogress_requests.is_empty() => { + let response: Bytes; + let request_type: RequestType; + let elapsed_sec: f64; + (response, request_type, elapsed_sec) = completed_task.unwrap()?; + pgb.write_message(&BeMessage::CopyData(&response))?; + pgb.flush().await?; - trace!("query: {copy_data_bytes:?}"); + match request_type { + RequestType::Exists => metrics.get_rel_exists.observe(elapsed_sec), + RequestType::Nblocks => metrics.get_rel_size.observe(elapsed_sec), + RequestType::GetPage => metrics.get_page_at_lsn.observe(elapsed_sec), + RequestType::DbSize => metrics.get_db_size.observe(elapsed_sec), + } - // Trace request if needed - if let Some(t) = tracer.as_mut() { - t.trace(©_data_bytes) - } + continue; + } - let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + // When a new request arrives, spawn a task to process it. + // If we already have MAX_INFLIGHT_REQUESTS requests in-progress, however, + // don't start new ones. + msg = pgb.read_message(), if inprogress_requests.len() < MAX_INFLIGHT_REQUESTS => { + let copy_data_bytes = match msg? { + Some(FeMessage::CopyData(bytes)) => bytes, + Some(m) => { + bail!("unexpected message: {m:?} during COPY"); + } + None => break, // client disconnected + }; - let response = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => { - let _timer = metrics.get_rel_exists.start_timer(); - self.handle_get_rel_exists_request(&timeline, &req).await - } - PagestreamFeMessage::Nblocks(req) => { - let _timer = metrics.get_rel_size.start_timer(); - self.handle_get_nblocks_request(&timeline, &req).await - } - PagestreamFeMessage::GetPage(req) => { - let _timer = metrics.get_page_at_lsn.start_timer(); - self.handle_get_page_at_lsn_request(&timeline, &req).await - } - PagestreamFeMessage::DbSize(req) => { - let _timer = metrics.get_db_size.start_timer(); - self.handle_db_size_request(&timeline, &req).await + trace!("query: {copy_data_bytes:?}"); + + // Trace request if needed + if let Some(t) = tracer.as_mut() { + t.trace(©_data_bytes) + } + + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; + + let timeline = Arc::clone(&timeline); + let conf = self.conf; + let task = async move { + let start_time = Instant::now(); + let (response, request_type) = match neon_fe_msg { + PagestreamFeMessage::Exists(req) => { + (Self::handle_get_rel_exists_request(&timeline, &req).await, + RequestType::Exists) + } + PagestreamFeMessage::Nblocks(req) => { + (Self::handle_get_nblocks_request(&timeline, &req).await, + RequestType::Nblocks) + } + PagestreamFeMessage::GetPage(req) => { + (Self::handle_get_page_at_lsn_request(conf, &timeline, &req).await, + RequestType::GetPage) + } + PagestreamFeMessage::DbSize(req) => { + (Self::handle_db_size_request(&timeline, &req).await, + RequestType::DbSize) + } + }; + + let response = response.unwrap_or_else(|e| { + // print the all details to the log with {:#}, but for the client the + // error message is enough + error!("error reading relation or page version: {:?}", e); + PagestreamBeMessage::Error(PagestreamErrorResponse { + message: e.to_string(), + }) + }); + let response: Bytes = response.serialize(); + (response, request_type, start_time.elapsed().as_secs_f64()) + }; + inprogress_requests.push_back(tokio::spawn(task)); + continue; } }; - - let response = response.unwrap_or_else(|e| { - // print the all details to the log with {:#}, but for the client the - // error message is enough - error!("error reading relation or page version: {:?}", e); - PagestreamBeMessage::Error(PagestreamErrorResponse { - message: e.to_string(), - }) - }); - - pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; - pgb.flush().await?; } Ok(()) } @@ -532,9 +577,8 @@ impl PageServerHandler { Ok(lsn) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] async fn handle_get_rel_exists_request( - &self, timeline: &Timeline, req: &PagestreamExistsRequest, ) -> Result { @@ -549,9 +593,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))] async fn handle_get_nblocks_request( - &self, timeline: &Timeline, req: &PagestreamNblocksRequest, ) -> Result { @@ -566,9 +609,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] + #[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))] async fn handle_db_size_request( - &self, timeline: &Timeline, req: &PagestreamDbSizeRequest, ) -> Result { @@ -586,11 +628,11 @@ impl PageServerHandler { })) } - #[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] - async fn handle_get_page_at_lsn_request( - &self, - timeline: &Timeline, - req: &PagestreamGetPageRequest, + #[instrument(skip(conf, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))] + async fn handle_get_page_at_lsn_request<'a>( + conf: &'static PageServerConf, + timeline: &'a Timeline, + req: &'a PagestreamGetPageRequest, ) -> Result { let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn) @@ -607,7 +649,7 @@ impl PageServerHandler { // FIXME: this profiling now happens at different place than it used to. The // current profiling is based on a thread-local variable, so it doesn't work // across awaits - let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests); + let _profiling_guard = profpoint_start(conf, ProfilingConfig::PageRequests); let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { @@ -615,9 +657,8 @@ impl PageServerHandler { })) } - #[instrument(skip(self, pgb))] + #[instrument(skip(pgb))] async fn handle_basebackup_request( - &self, pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, @@ -750,8 +791,7 @@ impl postgres_backend_async::Handler for PageServerHandler { }; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false) - .await?; + Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false).await?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } // return pair of prev_lsn and last_lsn @@ -810,7 +850,7 @@ impl postgres_backend_async::Handler for PageServerHandler { self.check_permission(Some(tenant_id))?; // Check that the timeline exists - self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) + Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true) .await?; pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") { diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 86d1266f09f6..56514bcf7e4e 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -135,6 +135,20 @@ pub static BACKGROUND_RUNTIME: Lazy = Lazy::new(|| { .expect("Failed to create background op runtime") }); +/// [`crate::walredo::PostgresRedoManager`] runtime. +/// +/// wal-redo uses it's own runtime to avoid deadlocks with the blocking code, see issue +/// . It is important that tasks running on this +/// runtime are not blocking. +pub static WALREDO_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("walredo worker") + .worker_threads(1) + .enable_all() + .build() + .expect("Failed to create walredo runtime") +}); + pub struct PageserverTaskId(u64); /// Each task that we track is associated with a "task ID". It's just an @@ -205,6 +219,13 @@ pub enum TaskKind { // task that handles attaching a tenant Attach, + + /// One or more external processes producing pages out of collected [`NeonWalRecord`]s, see + /// [`PostgresRedoManager`]. + /// + /// [`NeonWalRecord`]: crate::walrecord::NeonWalRecord + /// [`PostgresRedoManager`]: crate::walredo::PostgresRedoManager + WalRedo, } #[derive(Default)] diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 378f8deed748..f131f616637f 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -18,34 +18,27 @@ //! any WAL records, so that even if an attacker hijacks the Postgres //! process, he cannot escape out of it. //! +use anyhow::Context; use byteorder::{ByteOrder, LittleEndian}; -use bytes::{BufMut, Bytes, BytesMut}; -use nix::poll::*; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use serde::Serialize; -use std::fs::OpenOptions; -use std::io::prelude::*; -use std::io::{Error, ErrorKind}; -use std::ops::{Deref, DerefMut}; -use std::os::unix::io::AsRawFd; -use std::os::unix::prelude::CommandExt; -use std::path::PathBuf; +use std::num::NonZeroUsize; use std::process::Stdio; -use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command}; -use std::sync::Mutex; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use std::{fs, io}; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::crashsafe::path_with_suffix_extension; -use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock}; +use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn}; use crate::metrics::{ WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME, - WAL_REDO_WAIT_TIME, }; use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block}; use crate::repository::Key; -use crate::task_mgr::BACKGROUND_RUNTIME; +use crate::task_mgr::WALREDO_RUNTIME; use crate::walrecord::NeonWalRecord; use crate::{config::PageServerConf, TEMP_FILE_SUFFIX}; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -69,6 +62,11 @@ pub struct BufferTag { pub blknum: u32, } +impl BufferTag { + /// Serialized length + pub const LEN: u32 = RelTag::LEN + 4; +} + /// /// WAL Redo Manager is responsible for replaying WAL records. /// @@ -98,10 +96,8 @@ pub trait WalRedoManager: Send + Sync { /// records. /// pub struct PostgresRedoManager { - tenant_id: TenantId, conf: &'static PageServerConf, - - process: Mutex>, + handle: Handle, } /// Can this request be served by neon redo functions @@ -156,6 +152,9 @@ impl WalRedoManager for PostgresRedoManager { return Err(WalRedoError::InvalidRequest); } + // convert it to an arc to avoid cloning it on batches + let records: Arc<[(Lsn, NeonWalRecord)]> = records.into(); + let mut img: Option = base_img; let mut batch_neon = can_apply_in_neon(&records[0].1); let mut batch_start = 0; @@ -170,7 +169,8 @@ impl WalRedoManager for PostgresRedoManager { key, lsn, img, - &records[batch_start..i], + &records, + (batch_start..i).into(), self.conf.wal_redo_timeout, pg_version, ) @@ -189,7 +189,8 @@ impl WalRedoManager for PostgresRedoManager { key, lsn, img, - &records[batch_start..], + &records, + (batch_start..).into(), self.conf.wal_redo_timeout, pg_version, ) @@ -202,33 +203,42 @@ impl PostgresRedoManager { /// Create a new PostgresRedoManager. /// pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager { - // The actual process is launched lazily, on first request. - PostgresRedoManager { - tenant_id, - conf, - process: Mutex::new(None), - } + Self::multiprocess(conf, tenant_id, NonZeroUsize::new(1).unwrap()) } - /// Launch process pre-emptively. Should not be needed except for benchmarking. - pub fn launch_process(&mut self, pg_version: u32) -> anyhow::Result<()> { - let inner = self.process.get_mut().unwrap(); - if inner.is_none() { - let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?; - *inner = Some(p); - } - Ok(()) + /// Create a postgres redo manager with given number of maximum processes. + pub fn multiprocess( + conf: &'static PageServerConf, + tenant_id: TenantId, + max_processes: NonZeroUsize, + ) -> PostgresRedoManager { + // The actual process is launched lazily, on first request. + let h = WALREDO_RUNTIME.handle(); + let (handle, fut) = tokio_postgres_redo(conf, tenant_id, h, max_processes); + crate::task_mgr::spawn( + h, + crate::task_mgr::TaskKind::WalRedo, + Some(tenant_id), + None, + "walredo", + true, + fut, + ); + + PostgresRedoManager { conf, handle } } /// /// Process one request for WAL redo using wal-redo postgres /// + #[allow(clippy::too_many_arguments)] fn apply_batch_postgres( &self, key: Key, lsn: Lsn, base_img: Option, - records: &[(Lsn, NeonWalRecord)], + records: &Arc<[(Lsn, NeonWalRecord)]>, + records_range: SliceRange, wal_redo_timeout: Duration, pg_version: u32, ) -> Result { @@ -236,29 +246,31 @@ impl PostgresRedoManager { let start_time = Instant::now(); - let mut process_guard = self.process.lock().unwrap(); - let lock_time = Instant::now(); - - // launch the WAL redo process on first use - if process_guard.is_none() { - let p = PostgresRedoProcess::launch(self.conf, self.tenant_id, pg_version)?; - *process_guard = Some(p); - } - let process = process_guard.as_mut().unwrap(); - - WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); - // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; - let result = process - .apply_wal_records(buf_tag, base_img, records, wal_redo_timeout) - .map_err(WalRedoError::IoError); - let end_time = Instant::now(); - let duration = end_time.duration_since(lock_time); + let consumed_records = records_range.sub_slice(records); + let record_count = consumed_records.len() as u64; + + // while walredo processes are async, and execute on separate thread, this is mostly called + // from within async tasks but as a blocking invocation. + let result = self + .handle + .request_redo(Request { + target: buf_tag, + base_img, + records: records.clone(), + records_range, + timeout: wal_redo_timeout, + pg_version, + requeues: 0, + }) + .map_err(|e| WalRedoError::IoError(std::io::Error::new(std::io::ErrorKind::Other, e))); - let len = records.len(); - let nbytes = records.iter().fold(0, |acumulator, record| { + let duration = start_time.elapsed(); + + let len = record_count; + let nbytes = consumed_records.iter().fold(0, |acumulator, record| { acumulator + match &record.1 { NeonWalRecord::Postgres { rec, .. } => rec.len(), @@ -269,6 +281,7 @@ impl PostgresRedoManager { WAL_REDO_TIME.observe(duration.as_secs_f64()); WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64); WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64); + WAL_REDO_RECORD_COUNTER.inc_by(record_count); debug!( "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}", @@ -278,18 +291,6 @@ impl PostgresRedoManager { lsn ); - // If something went wrong, don't try to reuse the process. Kill it, and - // next request will launch a new one. - if result.is_err() { - error!( - "error applying {} WAL records ({} bytes) to reconstruct page image at LSN {}", - records.len(), - nbytes, - lsn - ); - let process = process_guard.take().unwrap(); - process.kill(); - } result } @@ -553,18 +554,13 @@ impl PostgresRedoManager { } } -/// /// Command with ability not to give all file descriptors to child process -/// -trait CloseFileDescriptors: CommandExt { - /// - /// Close file descriptors (other than stdin, stdout, stderr) in child process - /// - fn close_fds(&mut self) -> &mut Command; +trait CloseFileDescriptors { + fn close_fds(&mut self) -> &mut tokio::process::Command; } -impl CloseFileDescriptors for C { - fn close_fds(&mut self) -> &mut Command { +impl CloseFileDescriptors for tokio::process::Command { + fn close_fds(&mut self) -> &mut tokio::process::Command { unsafe { self.pre_exec(move || { // SAFETY: Code executed inside pre_exec should have async-signal-safety, @@ -587,418 +583,1506 @@ impl CloseFileDescriptors for C { } } -/// -/// Handle to the Postgres WAL redo process -/// -struct PostgresRedoProcess { +fn tokio_postgres_redo( + conf: &'static PageServerConf, tenant_id: TenantId, - child: NoLeakChild, - stdin: ChildStdin, - stdout: ChildStdout, - stderr: ChildStderr, -} - -impl PostgresRedoProcess { - // - // Start postgres binary in special WAL redo mode. - // - #[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))] - fn launch( - conf: &PageServerConf, - tenant_id: TenantId, - pg_version: u32, - ) -> Result { - // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we - // just create one with constant name. That fails if you try to launch more than - // one WAL redo manager concurrently. - let datadir = path_with_suffix_extension( - conf.tenant_path(&tenant_id).join("wal-redo-datadir"), - TEMP_FILE_SUFFIX, - ); + _handle: &tokio::runtime::Handle, + max_processes: NonZeroUsize, +) -> ( + Handle, + impl std::future::Future> + Send + 'static, +) { + // outer mpmc queue size + let expected_inflight = 4; + // intermediate queue size, max pipelined. there might be a different kind of balance needed if + // multiple processes are used. this already allows for 8 waiting request_redo callers, but + // with 4 processes it might mean that only one of them is actually busy. + let expected_pipelined = 4; + + let capacity = expected_inflight + expected_pipelined; + let (tx, rx) = flume::bounded::(capacity); + + let feedback = Arc::new(Feedback::new(capacity)); + + let handle = Handle { + tx: tx.clone(), + feedback: feedback.clone(), + }; + + let ipc = async move { + while let Ok(first) = { + let recv_next = rx.recv_async(); + tokio::pin!(recv_next); + + let watcher = crate::task_mgr::shutdown_watcher(); + tokio::pin!(watcher); + + tokio::select! { + recv = &mut recv_next => { + recv.map_err(|_| "handle dropped while waiting for first") + }, + _ = &mut watcher => { + Err("tenant shutdown while waiting for first item") + } + } + } { + let pg_version = first.0.pg_version; - // Create empty data directory for wal-redo postgres, deleting old one first. - if datadir.exists() { - info!( - "old temporary datadir {} exists, removing", - datadir.display() - ); - fs::remove_dir_all(&datadir)?; + let watcher = crate::task_mgr::shutdown_watcher(); + tokio::pin!(watcher); + + let mut first = Some(first); + + // counter to keep walredo paths unique, but also reuse them + let mut nth_tenant_process = 0; + + let mut control = Control::default(); + + loop { + // simple control loop for handling number of processes which takes in as inputs: + // + // - `notify_full` from the Handle when the queue might be at the limit + // - `notify_not_full` from the Handle which is signalled on every time something + // was pushed to the queue + // - `notify_empty` from walredo tasks, which they signal every next request + // timeout + // + // this works only at the proof of concept level for scaling up. + + match control + .iteration(&feedback, &mut watcher, first.is_some(), &max_processes) + .await + { + std::ops::ControlFlow::Break(()) => break, + std::ops::ControlFlow::Continue(true) => { + // TODO(control): this is probably an issue that we "halt" the control loop + // for the duration of the launching. + let child = + launch_walredo(conf, tenant_id, pg_version, nth_tenant_process).await?; + + nth_tenant_process += 1; + + let fut = walredo_rpc( + child, + tx.clone(), + rx.clone(), + expected_pipelined, + first.take(), + control.cancel.clone(), + feedback.clone(), + ) + .in_current_span(); + + control.js.spawn(fut); + info!("now running with {} processes", control.js.len()); + } + std::ops::ControlFlow::Continue(false) => {} + } + } } - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("incorrect pg_bin_dir path: {}", e), - ) - })?; - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).map_err(|e| { - Error::new( - ErrorKind::Other, - format!("incorrect pg_lib_dir path: {}", e), + + Ok(()) + } + .instrument(info_span!("wal-redo ctrl", %tenant_id)); + + (handle, ipc) +} + +/// Proof of concept lockless multiprocess control loop. +/// +/// Scales up too fast, and scales up and down too fast. +#[derive(Default)] +struct Control { + js: tokio::task::JoinSet<()>, + cancel: CancellationToken, + downsized_at_limit: bool, + successive_empty: usize, +} + +impl Control { + /// Do a control loop iteration. + /// + /// Returns a break value when control loop should be stopped, Continue(true) when new process + /// should be spawned and Continue(false) normally. + async fn iteration( + &mut self, + feedback: &Arc, + watcher: &mut std::pin::Pin<&mut F>, + have_first: bool, + max_processes: &NonZeroUsize, + ) -> std::ops::ControlFlow<(), bool> + where + F: std::future::Future, + { + use std::ops::ControlFlow::*; + + let check_not_empty = self.successive_empty > 0; + + let check_full = !self.js.is_empty() + && self.js.len() < max_processes.get() + && !self.cancel.is_cancelled(); + + tokio::select! { + // preconditions: + // - we are under the limit but have one + // - we haven't been cancelled + _ = feedback.notify_full.notified(), if check_full => { + if self.downsized_at_limit { + // we most likely had been pinged earlier during heavy load that the + // channel is full. however, we have been disabled once the limit was + // reached, so we are only now getting this old notification, so just + // clear it but don't start any process since we just downsized. + self.downsized_at_limit = false; + return Continue(false); + } + + self.successive_empty = 0; + + // similar to downscaling this will be notified when the buffer is full but + // can react to it by spawning many + + let mut forgotten_permits = 0; + while let Ok(permit) = feedback.downsize.try_acquire() { + permit.forget(); + forgotten_permits += 1; + } + + if forgotten_permits > 0 { + warn!(forgotten_permits, "this should really have a test case for it, but we just forgot permits while adding a new process"); + } + + Continue(true) + }, + _ = feedback.notify_not_empty.notified(), if check_not_empty => { + self.successive_empty = 0; + Continue(false) + }, + _ = feedback.notify_empty.notified(), if self.js.len() > 1 => { + self.downsized_at_limit |= self.js.len() >= max_processes.get(); + // let one exit + // TODO: this lets many since all of them will notify at same time ... + // could debounce to some tick rate? + feedback.downsize.add_permits(1); + self.successive_empty = 0; + Continue(false) + }, + _ = feedback.notify_empty.notified(), if self.js.len() == 1 => { + self.successive_empty += 1; + if self.successive_empty > 5 { + // if we have been idle for 5 timeouts, then we might as well kill the + // last process and go to awaiting for the next work in this task. + feedback.downsize.add_permits(1); + } + Continue(false) + }, + exited = self.js.join_next() => { + match exited { + Some(Ok(())) => { + // normal, logging happens in walredo_rpc + Continue(false) + }, + Some(Err(e)) => { + // hook should had printed already + warn!("walredo task panicked: {e}"); + Continue(false) + }, + None => { + if self.cancel.is_cancelled() { + // on shutdown, now the children have all exited, so should we. + Break(()) + } else if have_first { + // we are yet to launch our first process + Continue(true) + } else { + debug!("all wal redo processes have been stopped, waiting for the next redo request before launching new process"); + Break(()) + } + } + } + } + _ = watcher, if !self.cancel.is_cancelled() => { + // we don't need to watch our cancellation/shutdown because the tasks do, + // however we do need the guidance for the scale up + self.cancel.cancel(); + Continue(false) + } + } + } +} + +#[instrument(skip(conf, tenant_id))] +async fn launch_walredo( + conf: &PageServerConf, + tenant_id: TenantId, + pg_version: u32, + nth_tenant_process: usize, +) -> anyhow::Result { + let datadir = path_with_suffix_extension( + conf.tenant_path(&tenant_id) + .join(format!("wal-redo-datadir-{nth_tenant_process}")), + TEMP_FILE_SUFFIX, + ); + + match tokio::fs::remove_dir_all(&datadir).await { + Ok(()) => { + info!("removed existing data directory: {}", datadir.display()); + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + other => other.with_context(|| { + format!( + "Failed to cleanup existing wal-redo-datadir at {}", + datadir.display() ) - })?; - - info!("running initdb in {}", datadir.display()); - let initdb = Command::new(pg_bin_dir_path.join("initdb")) - .args(&["-D", &datadir.to_string_lossy()]) - .arg("-N") - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir_path) - .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) // macOS - .close_fds() - .output() - .map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?; - - if !initdb.status.success() { - return Err(Error::new( - ErrorKind::Other, - format!( - "initdb failed\nstdout: {}\nstderr:\n{}", - String::from_utf8_lossy(&initdb.stdout), - String::from_utf8_lossy(&initdb.stderr) - ), - )); + })?, + } + + let pg_bin_dir_path = conf.pg_bin_dir(pg_version)?; + let pg_lib_dir_path = conf.pg_lib_dir(pg_version)?; + + info!("running initdb in {}", datadir.display()); + + let initdb = tokio::process::Command::new(pg_bin_dir_path.join("initdb")) + .arg("-D") + .arg(&datadir) + .arg("-N") + .env_clear() + .env("LD_LIBRARY_PATH", &pg_lib_dir_path) + .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) + .close_fds() + .output() + .await + .context("Failed to execute initdb for wal-redo")?; + + anyhow::ensure!( + initdb.status.success(), + "initdb failed\nstdout: {}\nstderr:\n {}", + String::from_utf8_lossy(&initdb.stdout), + String::from_utf8_lossy(&initdb.stderr) + ); + + info!("starting walredo process"); + + tokio::process::Command::new(pg_bin_dir_path.join("postgres")) + .arg("--wal-redo") + .stdin(Stdio::piped()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .env_clear() + .env("LD_LIBRARY_PATH", &pg_lib_dir_path) + .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) + .env("PGDATA", &datadir) + .close_fds() + // best effort is probably good enough for us + .kill_on_drop(true) + .spawn() + .context("postgres --wal-redo command failed to start") +} + +struct Feedback { + capacity: usize, + waiting_work: AtomicUsize, + notify_full: tokio::sync::Notify, + notify_not_empty: tokio::sync::Notify, + notify_empty: tokio::sync::Notify, + downsize: tokio::sync::Semaphore, +} + +impl Feedback { + fn new(outer_capacity: usize) -> Self { + Feedback { + capacity: outer_capacity, + waiting_work: AtomicUsize::default(), + notify_full: tokio::sync::Notify::default(), + notify_not_empty: tokio::sync::Notify::default(), + notify_empty: tokio::sync::Notify::default(), + downsize: tokio::sync::Semaphore::new(0), + } + } + + fn inc_waiting_work(&self) { + let ordering = std::sync::atomic::Ordering::Relaxed; + let now = self.waiting_work.fetch_add(1, ordering); + + // because of relaxed atomics, we can see the increments and decrements in whatever order, + // the count will sometimes be wrapped around + if now == self.capacity - 1 { + self.notify_full.notify_one(); } else { - // Limit shared cache for wal-redo-postgres - let mut config = OpenOptions::new() - .append(true) - .open(PathBuf::from(&datadir).join("postgresql.conf"))?; - config.write_all(b"shared_buffers=128kB\n")?; - config.write_all(b"fsync=off\n")?; - } - - // Start postgres itself - let child = Command::new(pg_bin_dir_path.join("postgres")) - .arg("--wal-redo") - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir_path) - .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) - .env("PGDATA", &datadir) - // The redo process is not trusted, and runs in seccomp mode that - // doesn't allow it to open any files. We have to also make sure it - // doesn't inherit any file descriptors from the pageserver, that - // would allow an attacker to read any files that happen to be open - // in the pageserver. - // - // The Rust standard library makes sure to mark any file descriptors with - // as close-on-exec by default, but that's not enough, since we use - // libraries that directly call libc open without setting that flag. - .close_fds() - .spawn_no_leak_child() - .map_err(|e| { - Error::new( - e.kind(), - format!("postgres --wal-redo command failed to start: {}", e), - ) - })?; - - let mut child = scopeguard::guard(child, |child| { - error!("killing wal-redo-postgres process due to a problem during launch"); - child.kill_and_wait(); - }); + self.notify_not_empty.notify_one(); + } + } - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); + fn dec_waiting_work(&self) { + let ordering = std::sync::atomic::Ordering::Relaxed; + self.waiting_work.fetch_sub(1, ordering); + } - macro_rules! set_nonblock_or_log_err { - ($file:ident) => {{ - let res = set_nonblock($file.as_raw_fd()); - if let Err(e) = &res { - error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); - } - res - }}; - } - set_nonblock_or_log_err!(stdin)?; - set_nonblock_or_log_err!(stdout)?; - set_nonblock_or_log_err!(stderr)?; - - // all fallible operations post-spawn are complete, so get rid of the guard - let child = scopeguard::ScopeGuard::into_inner(child); - - Ok(PostgresRedoProcess { - tenant_id, - child, - stdin, - stdout, - stderr, - }) + fn should_downsize(&self) -> bool { + if let Ok(permit) = self.downsize.try_acquire() { + // this downsizing permit has now been "used", more permits need to be + // added by the controller to downsize later processes + permit.forget(); + true + } else { + self.notify_empty.notify_one(); + false + } } +} + +#[allow(clippy::too_many_arguments)] +async fn walredo_rpc( + mut child: tokio::process::Child, + work_tx: flume::Sender, + work_rx: flume::Receiver, + expected_inflight: usize, + initial: Option, + cancel: CancellationToken, + feedback: Arc, +) { + let pid = child + .id() + .expect("pid is present before killing the process"); + + info!("Launched wal-redo process: {pid}"); + + // we send the external the request in different commands + let stdin = child.stdin.take().expect("not taken yet"); + + // stdout is used to communicate the resulting page + let stdout = child.stdout.take().expect("not taken yet"); + + #[cfg(not_needed)] + { + use std::os::unix::io::AsRawFd; + + nix::fcntl::fcntl( + stdout.as_raw_fd(), + nix::fcntl::FcntlArg::F_SETPIPE_SZ(8192 * 4), + ) + .unwrap(); + + nix::fcntl::fcntl( + stdin.as_raw_fd(), + nix::fcntl::FcntlArg::F_SETPIPE_SZ(1048576), + ) + .unwrap(); + + let pipe_sizes = + nix::fcntl::fcntl(stdin.as_raw_fd(), nix::fcntl::FcntlArg::F_GETPIPE_SZ).unwrap(); + let pipe_sizes = ( + pipe_sizes, + nix::fcntl::fcntl(stdout.as_raw_fd(), nix::fcntl::FcntlArg::F_GETPIPE_SZ).unwrap(), + ); - #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))] - fn kill(self) { - self.child.kill_and_wait(); + debug!(stdin = pipe_sizes.0, stdout = pipe_sizes.1, "pipe sizes"); } - // - // Apply given WAL records ('records') over an old page image. Returns - // new page image. - // - #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.child.id()))] - fn apply_wal_records( - &mut self, - tag: BufferTag, - base_img: Option, - records: &[(Lsn, NeonWalRecord)], - wal_redo_timeout: Duration, - ) -> Result { - // Serialize all the messages to send the WAL redo process first. - // - // This could be problematic if there are millions of records to replay, - // but in practice the number of records is usually so small that it doesn't - // matter, and it's better to keep this code simple. + // used to communicate hopefully utf-8 log messages + let stderr = child.stderr.take().expect("not taken yet"); + + let (inter_tx, inter_rx) = tokio::sync::mpsc::channel(expected_inflight); + + let stdin_task = stdin_task(stdin, work_tx, work_rx, inter_tx, initial, feedback); + + let stdin_task = async { + tokio::pin!(stdin_task); + + // because stdin and stdout are interconnected with a channel to faciliate + // pipelining, we don't need to cancel all of the tasks, just the "feeder" task. // - // Most requests start with a before-image with BLCKSZ bytes, followed by - // by some other WAL records. Start with a buffer that can hold that - // comfortably. - let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); - build_begin_redo_for_block_msg(tag, &mut writebuf); - if let Some(img) = base_img { - build_push_page_msg(tag, &img, &mut writebuf); - } - for (lsn, rec) in records.iter() { - if let NeonWalRecord::Postgres { - will_init: _, - rec: postgres_rec, - } = rec - { - build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); - } else { - return Err(Error::new( - ErrorKind::Other, - "tried to pass neon wal record to postgres WAL redo", - )); - } - } - build_get_page_msg(tag, &mut writebuf); - WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - - // The input is now in 'writebuf'. Do a blind write first, writing as much as - // we can, before calling poll(). That skips one call to poll() if the stdin is - // already available for writing, which it almost certainly is because the - // process is idle. - let mut nwrite = self.stdin.write(&writebuf)?; - - // We expect the WAL redo process to respond with an 8k page image. We read it - // into this buffer. - let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far - - // Prepare for calling poll() - let mut pollfds = [ - PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN), - PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN), - PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT), - ]; - - // We do three things simultaneously: send the old base image and WAL records to - // the child process's stdin, read the result from child's stdout, and forward any logging - // information that the child writes to its stderr to the page server's log. - while nresult < BLCKSZ.into() { - // If we have more data to write, wake up if 'stdin' becomes writeable or - // we have data to read. Otherwise only wake up if there's data to read. - let nfds = if nwrite < writebuf.len() { 3 } else { 2 }; - let n = loop { - match nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32) { - Err(e) if e == nix::errno::Errno::EINTR => continue, - res => break res, + // using tokio::try_join! later we get the wanted "channel close or shutdown takes + // down all tasks after finishing the in-flight work" + tokio::select! { + ret = &mut stdin_task => ret, + _ = cancel.cancelled() => { + // instead of cancelling all of the tasks at once by: + // + // Err("tenant shutdown, cancelling walredo work") + // + // just "close" channel connecting stdin and stdout tasks so that + // whatever got through, will be processed. + // + // walredo process might end up seeing one more request we will not + // read the response to, but it's most likely killed at that point + // anyways. + Ok(()) + } + } + }; + + let stdout_task = stdout_task(stdout, inter_rx); + + let stderr_task = stderr_task(stderr); + + async move { + tokio::pin!(stdin_task); + tokio::pin!(stdout_task); + tokio::pin!(stderr_task); + + let mut stdin_alive = true; + let mut stderr_alive = true; + + loop { + tokio::select! { + res = &mut stdin_task, if stdin_alive => { + stdin_alive = false; + if let Err(e) = res { + warn!("stdin task failed: {e:?}"); + break; + } + // don't stop polling on stdin task exit, because we might still have pipelined + // waiting for stdout task to complete them. + }, + res = &mut stdout_task => { + if let Err(e) = res { + warn!("stdout task failed: {e:?}"); + } + // always break after stdout task exits, as stderr never/rarely exits + break; + }, + res = &mut stderr_task, if stderr_alive => { + stderr_alive = false; + if let Err(e) = res { + warn!("stderr task failed: {e:?}"); + break; + } + }, + }; + } + + // only reason why this would fail is that it has exited already, which is + // possible by manually killing it outside of pageserver or by it dying while + // processing. + let killed = child.start_kill().is_ok(); + + match child.wait().await { + Ok(status) => { + if !status.success() { + if killed { + // the status will never be success when we kill, at least on unix + debug!(?status, "wal-redo process did not exit successfully as pageserver killed it"); + } else { + // situations like killing it from outside manually + warn!(?status, "wal-redo process did not exit successfully but pageserver did not kill it"); + }; } - }?; + } + Err(e) => { + error!("failed to wait for child process to exit: {e}"); + } + } - if n == 0 { - return Err(Error::new(ErrorKind::Other, "WAL redo timed out")); + // FIXME: now that the child has exited, we could bulldoze the pgdatadir + // this would be sensible at least in multiprocess > 1 situation, making + // startup times faster + + info!("task exiting"); + } + .instrument(info_span!("wal-redo", pid)) + .await +} + +/// Payload from stdin task to stdout task for completion carrying the response channel and the +/// deadline for reading from stdout. +type IntermediatePayload = (flume::Sender>, tokio::time::Instant); + +#[derive(Debug)] +enum StdinTaskError { + IntermediateChannelClosed, +} + +/// Returns a human readable reason for stopping. Channel being closed or shutdown are turned into +/// `Ok(())`. +#[allow(clippy::too_many_arguments)] +#[instrument(name = "stdin", skip_all)] +async fn stdin_task( + mut stdin: AW, + work_tx: flume::Sender, + work_rx: flume::Receiver, + inter_tx: tokio::sync::mpsc::Sender, + mut initial: Option, + feedback: Arc, +) -> Result<(), StdinTaskError> +where + AW: tokio::io::AsyncWrite + Unpin, +{ + use futures::future::{poll_fn, TryFutureExt}; + use std::pin::Pin; + use tokio::io::AsyncWriteExt; + use tokio_util::io::poll_write_buf; + + let mut buffers = BufQueue::default(); + + let have_vectored_stdin = stdin.is_write_vectored(); + + let mut scratch = BytesMut::with_capacity(if have_vectored_stdin { + // without vectoring we aim at 3 messages: begin, page, records + get_page, + // with vectoring this will be very much enough + 1024 * 16 + } else { + // without vectoring, use full buffer + 1024 * 64 + }); + + let work_timeout = Duration::from_secs(1); + + loop { + let ((request, response), reservation) = { + // when running in a multiprocess configuration, we must acquire the write side before + // trying to acquire more work from the shared queue, otherwise we'd be stuck holding + // the global work while we've overcommited this one process. + let reservation = match inter_tx.reserve().await { + Ok(r) => r, + Err(_send_error) => { + if let Some(mut initial) = initial { + if !initial.0.inc_requeued() { + drop(initial.1.send(Err(anyhow::anyhow!( + "Failed to start the walredo process too many times" + )))); + } else { + // re-queue (while reordering) the initial work item + drop(work_tx.send_async(initial).await); + } + } + + return Err(StdinTaskError::IntermediateChannelClosed); + } + }; + + let next = initial.take(); + let next = if next.is_none() { + // shutdown is handled outside of this task + match tokio::time::timeout(work_timeout, work_rx.recv_async()).await { + Ok(m) => m.ok(), + Err(_timeout) => { + // we are in a good place to exit, so do it + if feedback.should_downsize() { + // returning None here will exit from the task, and so we can teardown + // this process + None + } else { + continue; + } + } + } + } else { + next + }; + + match next { + Some(t) => { + feedback.dec_waiting_work(); + (t, reservation) + } + None => { + // the Handle or the request queue sender have been dropped; return Ok(()) to keep + // processing any of already pipelined requests. + // + // the drop however does never seem to happen, but most likely the task is + // descheduled because of a shutdown_watcher. + return Ok(()); + } } + }; - // If we have some messages in stderr, forward them to the log. - let err_revents = pollfds[1].revents().unwrap(); - if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - let mut errbuf: [u8; 16384] = [0; 16384]; - let n = self.stderr.read(&mut errbuf)?; + let timeout_at = tokio::time::Instant::now() + request.timeout; - // The message might not be split correctly into lines here. But this is - // good enough, the important thing is to get the message to the log. - if n > 0 { - error!( - "wal-redo-postgres: {}", - String::from_utf8_lossy(&errbuf[0..n]) - ); + let records = request.records_range.sub_slice(&request.records); + + if have_vectored_stdin { + build_vectored_messages( + request.target, + request.base_img, + records, + &mut scratch, + &mut buffers, + ); + } else { + build_messages( + request.target, + request.base_img, + records, + &mut scratch, + &mut buffers, + ); + } + + let write_res = async { + while buffers.has_remaining() { + poll_fn(|cx| poll_write_buf(Pin::new(&mut stdin), cx, &mut buffers)) + .await + .map_err(anyhow::Error::new)?; + } + // in general flush is not needed, does nothing on pipes, but since we now accept + // AsyncWrite + stdin.flush().await.map_err(anyhow::Error::new) + }; - // To make sure we capture all log from the process if it fails, keep - // reading from the stderr, before checking the stdout. - continue; + let write_res = tokio::time::timeout_at(timeout_at, write_res) + .map_err(|_| anyhow::anyhow!("write timeout")); + + // wait the write to complete before sending the completion over, because we cannot fail + // the request after it has been moved. this could be worked around by making the oneshot + // into a normal mpsc queue of size 2 and making the read side race against the channel + // closing and timeouted read. + match write_res.await.and_then(|x| x) { + Ok(()) => { + // by writing and then completing later we achieve request pipelining with the + // walredo process. some workloads fit well into the default 64KB buffer, so we + // have an opportunity to keep walredo busy by buffering the requests. + + if inter_tx.is_closed() { + drop(response.send(Err(anyhow::anyhow!( + "Failed to read walredo response: stdout task closed already" + )))); + return Err(StdinTaskError::IntermediateChannelClosed); + } else { + reservation.send((response, timeout_at)); } - } else if err_revents.contains(PollFlags::POLLHUP) { - return Err(Error::new( - ErrorKind::BrokenPipe, - "WAL redo process closed its stderr unexpectedly", - )); - } - - // If we have more data to write and 'stdin' is writeable, do write. - if nwrite < writebuf.len() { - let in_revents = pollfds[2].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += self.stdin.write(&writebuf[nwrite..])?; - } else if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - return Err(Error::new( - ErrorKind::BrokenPipe, - "WAL redo process closed its stdin unexpectedly", - )); + } + Err(io_or_timeout) => { + // TODO: could we somehow manage to keep the request in case we need to + // restart the process? see https://github.com/neondatabase/neon/issues/1700 + // we could send it back to the queue at least here, but not so easily when it's + // inflight. + + if let Err(e) = + response.send(Err(io_or_timeout).context("Failed to write request to wal-redo")) + { + let e = e.into_inner().expect_err("just created the value"); + // log this here, even though we could return an error + warn!("stopping due to io or timeout error: {e:#}"); } + // we can still continue processing pipelined requests, if any. the + // stdout task will exit upon seeing we've dropped the result_txs. + return Ok(()); } + } + } +} + +#[derive(Debug)] +enum StdoutTaskError { + ReadFailed(std::io::Error), + StdoutClosed, + ReadTimeout, +} - // If we have some data in stdout, read it to the result buffer. - let out_revents = pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += self.stdout.read(&mut resultbuf[nresult..])?; - } else if out_revents.contains(PollFlags::POLLHUP) { - return Err(Error::new( - ErrorKind::BrokenPipe, - "WAL redo process closed its stdout unexpectedly", - )); +/// Always eventually fails, returning a developer friendly reason. +#[instrument(name = "stdout", skip_all)] +async fn stdout_task( + mut stdout: AR, + mut inter_rx: tokio::sync::mpsc::Receiver, +) -> Result<(), StdoutTaskError> +where + AR: tokio::io::AsyncRead + Unpin, +{ + use tokio::io::AsyncReadExt; + + async fn read_page(mut stdout: AR, buffer: &mut Vec) -> Result<(), StdoutTaskError> + where + AR: tokio::io::AsyncRead + Unpin, + { + loop { + let read = stdout + .read_buf(buffer) + .await + .map_err(StdoutTaskError::ReadFailed)?; + if read == 0 { + return Err(StdoutTaskError::StdoutClosed); } + if buffer.len() < 8192 { + continue; + } + return Ok(()); } + } - Ok(Bytes::from(resultbuf)) + // the extra past one block is for normally reading and clearing out the readyness for the + // stdout. tokio does not expose an api on ChildStdout for this, so we need to have this + // "hack". this buffer is reused between reads. + let mut page_buf = Vec::with_capacity(8192 + 4096); + + loop { + let (completion, timeout_at) = match inter_rx.recv().await { + Some(t) => t, + None => return Ok(()), + }; + + let res = tokio::time::timeout_at(timeout_at, read_page(&mut stdout, &mut page_buf)) + .await + .map_err(|_elapsed| StdoutTaskError::ReadTimeout) + .and_then(|x| x); + + match res { + Ok(()) => { + let page = Bytes::copy_from_slice(&page_buf[..8192]); + // we don't care about the result, because the caller could be gone + drop(completion.into_send_async(Ok(page)).await); + page_buf.drain(..8192); + } + Err(e @ StdoutTaskError::ReadFailed(_)) => { + drop(completion.send(Err(anyhow::anyhow!( + "Failed to read response from wal-redo" + )))); + return Err(e); + } + Err(e @ StdoutTaskError::StdoutClosed) => { + drop(completion.send(Err(anyhow::anyhow!( + "Failed to read response from wal-redo" + )))); + return Err(e); + } + Err(e @ StdoutTaskError::ReadTimeout) => { + drop(completion.send(Err(anyhow::anyhow!( + "Timed out while waiting for the page from wal-redo" + )))); + return Err(e); + } + } } } -/// Wrapper type around `std::process::Child` which guarantees that the child -/// will be killed and waited-for by this process before being dropped. -struct NoLeakChild { - child: Option, +#[derive(Debug)] +enum StderrTaskError { + ReadFailed(std::io::Error), + Closed, } -impl Deref for NoLeakChild { - type Target = Child; - - fn deref(&self) -> &Self::Target { - self.child.as_ref().expect("must not use from drop") +#[instrument(name = "stderr", skip_all)] +async fn stderr_task(stderr: AR) -> Result<(), StderrTaskError> +where + AR: tokio::io::AsyncRead, +{ + use tokio::io::AsyncBufReadExt; + let stderr = tokio::io::BufReader::new(stderr); + let mut buffer = Vec::new(); + + tokio::pin!(stderr); + + loop { + buffer.clear(); + match stderr.read_until(b'\n', &mut buffer).await { + Ok(0) => return Err(StderrTaskError::Closed), + Ok(read) => { + let message = String::from_utf8_lossy(&buffer[..read]); + error!("wal-redo-process: {}", message.trim()); + } + Err(e) => { + return Err(StderrTaskError::ReadFailed(e)); + } + } } } -impl DerefMut for NoLeakChild { - fn deref_mut(&mut self) -> &mut Self::Target { - self.child.as_mut().expect("must not use from drop") +/// Serializes the wal redo request into `buffers` with the help of scratch buffer `scratch`. +/// +/// The request is combination of `B + P? + A* + G`. +/// +/// Compared to [`build_vectored_messages`], this implementation builds at most 3 messages if the +/// base version of page is included (it's never copied to conserve the "scratch" space). +/// +/// All of the messages have structure of `command + len + payload?`, where: +/// - `command` is a single ascii character `[BPAG]` +/// - `len` is payload length + length of len (4) +/// - payload is command specific +fn build_messages( + target: BufferTag, + base_img: Option, + records: &[(Lsn, NeonWalRecord)], + scratch: &mut BytesMut, + buffers: &mut BufQueue, +) { + let need = message_length( + base_img.is_some(), + usize::MAX, + records.iter().map(|(_lsn, r)| match r { + NeonWalRecord::Postgres { rec, .. } => rec.len(), + _ => unreachable!(), + }), + ); + + scratch.reserve(need); + + let tag = { + target.ser_into(&mut scratch.writer()).unwrap(); + scratch.split().freeze() + }; + + build_begin_message(&tag, scratch); + + if let Some(page) = base_img { + assert_eq!(page.len(), 8192); + build_push_page_header(&tag, scratch); + + let out = scratch.split().freeze(); + + buffers.push(out); + buffers.push(page); + } + + for (end_lsn, record) in records { + let (_will_init, rec) = match record { + NeonWalRecord::Postgres { will_init, rec } => (will_init, rec), + _ => unreachable!(), + }; + + build_apply_record_header(end_lsn, rec.len() as u32, scratch); + scratch.put(rec.clone()); } + + build_get_page_message(&tag, scratch); + + let out = scratch.split().freeze(); + buffers.push(out); +} + +/// Calculates the buffer space needed for the messages produced out of this request. +#[allow(clippy::identity_op)] +fn message_length(base_img: bool, copy_walrecord_threshold: usize, records: I) -> usize +where + I: Iterator, +{ + let tag_len = BufferTag::LEN as usize; + + let begin = 1 + 4 + tag_len; + let page = if base_img { + 1 + 4 + tag_len + /* let kernel copy page */ 0 + } else { + 0 + }; + + let records = records + .map(|rec_len| { + 1 + 4 + + 8 + + if rec_len < copy_walrecord_threshold { + rec_len + } else { + 0 + } + }) + .sum::(); + + let get_page = 1 + 4 + tag_len; + + begin + page + records + get_page } -impl NoLeakChild { - fn spawn(command: &mut Command) -> io::Result { - let child = command.spawn()?; - Ok(NoLeakChild { child: Some(child) }) +/// Compared to [`build_messages`] builds many small messages and aiming for vectored write +/// handling the gathering of the already allocated records. +fn build_vectored_messages( + target: BufferTag, + base_img: Option, + records: &[(Lsn, NeonWalRecord)], + scratch: &mut BytesMut, + buffers: &mut BufQueue, +) { + let copy_walrecord_threshold = 1024; + + let need = message_length( + base_img.is_some(), + copy_walrecord_threshold, + records.iter().map(|(_lsn, r)| match r { + NeonWalRecord::Postgres { rec, .. } => rec.len(), + _ => unreachable!(), + }), + ); + + scratch.reserve(need); + + let tag = { + target.ser_into(&mut scratch.writer()).unwrap(); + scratch.split().freeze() + }; + + build_begin_message(&tag, scratch); + + if let Some(page) = base_img { + build_push_page_header(&tag, scratch); + buffers.push(scratch.split().freeze()); + buffers.push(page); } - fn kill_and_wait(mut self) { - let child = match self.child.take() { - Some(child) => child, - None => return, + for (end_lsn, record) in records { + let rec = match record { + NeonWalRecord::Postgres { rec, .. } => rec, + _ => unreachable!(), }; - Self::kill_and_wait_impl(child); - } - - #[instrument(skip_all, fields(pid=child.id()))] - fn kill_and_wait_impl(mut child: Child) { - let res = child.kill(); - if let Err(e) = res { - // This branch is very unlikely because: - // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it. - // - This is the only place that calls .kill() - // - We consume `self`, so, .kill() can't be called twice. - // - If the process exited by itself or was killed by someone else, - // .kill() will still succeed because we haven't wait()'ed yet. - // - // So, if we arrive here, we have really no idea what happened, - // whether the PID stored in self.child is still valid, etc. - // If this function were fallible, we'd return an error, but - // since it isn't, all we can do is log an error and proceed - // with the wait(). - error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process"); - } - - match child.wait() { - Ok(exit_status) => { - info!(exit_status = %exit_status, "wait successful"); - } - Err(e) => { - error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)"); - } + + let record_len = rec.len() as u32; + build_apply_record_header(end_lsn, record_len, scratch); + if rec.len() < copy_walrecord_threshold { + scratch.put(rec.clone()); + } else { + buffers.push(scratch.split().freeze()); + buffers.push(rec.clone()); } } + + build_get_page_message(&tag, scratch); + buffers.push(scratch.split().freeze()); +} + +fn build_begin_message(tag: &Bytes, scratch: &mut BytesMut) { + scratch.put_u8(b'B'); + scratch.put_u32(4 + BufferTag::LEN); + scratch.put(tag.clone()); +} + +fn build_push_page_header(tag: &Bytes, scratch: &mut BytesMut) { + let page_len = 8192; + scratch.put_u8(b'P'); + scratch.put_u32(4 + BufferTag::LEN + page_len); + scratch.put(tag.clone()); +} + +fn build_apply_record_header(end_lsn: &Lsn, record_len: u32, scratch: &mut BytesMut) { + scratch.put_u8(b'A'); + scratch.put_u32(4 + 8 + record_len); + scratch.put_u64(end_lsn.0); +} + +fn build_get_page_message(tag: &Bytes, scratch: &mut BytesMut) { + scratch.put_u8(b'G'); + scratch.put_u32(4 + BufferTag::LEN); + scratch.put(tag.clone()); +} + +type Payload = (Request, flume::Sender>); + +/// WAL Redo request +struct Request { + target: BufferTag, + base_img: Option, + records: Arc<[(Lsn, NeonWalRecord)]>, + records_range: SliceRange, + timeout: std::time::Duration, + pg_version: u32, + requeues: u8, } -impl Drop for NoLeakChild { - fn drop(&mut self) { - let child = match self.child.take() { - Some(child) => child, - None => return, +impl Request { + /// Returns true, if this should be requeued or false, if too many requeuing have happened + /// already. + fn inc_requeued(&mut self) -> bool { + self.requeues = if let Some(remaining) = self.requeues.checked_add(1) { + remaining + } else { + return false; }; - // Offload the kill+wait of the child process into the background. - // If someone stops the runtime, we'll leak the child process. - // We can ignore that case because we only stop the runtime on pageserver exit. - BACKGROUND_RUNTIME.spawn(async move { - tokio::task::spawn_blocking(move || { - Self::kill_and_wait_impl(child); - }) - .await - }); + + self.requeues < 8 } } -trait NoLeakChildCommandExt { - fn spawn_no_leak_child(&mut self) -> io::Result; +#[derive(Clone)] +struct Handle { + tx: flume::Sender, + feedback: Arc, } -impl NoLeakChildCommandExt for Command { - fn spawn_no_leak_child(&mut self) -> io::Result { - NoLeakChild::spawn(self) +impl Handle { + fn request_redo(&self, request: Request) -> anyhow::Result { + let (result_tx, result_rx) = flume::bounded(1); + + self.tx + .send((request, result_tx)) + .map_err(|_| anyhow::anyhow!("Failed to communicate with the walredo task"))?; + + self.feedback.inc_waiting_work(); + + result_rx + .recv() + .context("Failed to get a WAL Redo'd page back") + .and_then(|x| x) } } -// Functions for constructing messages to send to the postgres WAL redo -// process. See pgxn/neon_walredo/walredoproc.c for -// explanation of the protocol. +enum SliceRange { + InclusiveExclusive(std::ops::Range), + RangeFrom(std::ops::RangeFrom), +} -fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec) { - let len = 4 + 1 + 4 * 4; +impl From> for SliceRange { + fn from(r: std::ops::Range) -> Self { + SliceRange::InclusiveExclusive(r) + } +} - buf.put_u8(b'B'); - buf.put_u32(len as u32); +impl From> for SliceRange { + fn from(r: std::ops::RangeFrom) -> Self { + SliceRange::RangeFrom(r) + } +} - tag.ser_into(buf) - .expect("serialize BufferTag should always succeed"); +impl SliceRange { + fn sub_slice<'a, T>(&self, full_slice: &'a [T]) -> &'a [T] { + match self { + SliceRange::InclusiveExclusive(r) => &full_slice[r.start..r.end], + SliceRange::RangeFrom(r) => &full_slice[r.start..], + } + } } -fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec) { - assert!(base_img.len() == 8192); +/// Conceptually `Vec` masquerading as `bytes::Buf`. +/// +/// Used to build vectorized writes, in case we have a base page. +/// +/// Adapted from https://github.com/tokio-rs/bytes/pull/371 +struct BufQueue { + bufs: std::collections::VecDeque, + remaining: usize, +} + +impl Default for BufQueue { + fn default() -> Self { + Self { + bufs: std::collections::VecDeque::with_capacity(4), + remaining: 0, + } + } +} - let len = 4 + 1 + 4 * 4 + base_img.len(); +impl BufQueue { + fn push(&mut self, buffer: Bytes) { + let rem = buffer.remaining(); + if rem != 0 { + self.bufs.push_back(buffer); + self.remaining = self.remaining.checked_add(rem).expect("remaining overflow"); + } + } - buf.put_u8(b'P'); - buf.put_u32(len as u32); - tag.ser_into(buf) - .expect("serialize BufferTag should always succeed"); - buf.put(base_img); + #[allow(unused)] + fn clear(&mut self) { + self.bufs.clear(); + self.remaining = 0; + } } -fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec) { - let len = 4 + 8 + rec.len(); +impl bytes::Buf for BufQueue { + fn remaining(&self) -> usize { + self.remaining + } - buf.put_u8(b'A'); - buf.put_u32(len as u32); - buf.put_u64(endlsn.0); - buf.put(rec); + fn chunk(&self) -> &[u8] { + match self.bufs.front() { + Some(b) => b.chunk(), + None => &[], + } + } + + fn chunks_vectored<'a>(&'a self, mut dst: &mut [std::io::IoSlice<'a>]) -> usize { + let mut n = 0; + + for b in &self.bufs { + if dst.is_empty() { + break; + } + let next = b.chunks_vectored(dst); + dst = &mut dst[next..]; + n += next; + } + + n + } + + fn advance(&mut self, mut cnt: usize) { + while cnt != 0 { + let front = self.bufs.front_mut().expect("mut not be empty"); + let rem = front.remaining(); + let advance = std::cmp::min(cnt, rem); + front.advance(advance); + if rem == advance { + self.bufs.pop_front().unwrap(); + } + cnt -= advance; + self.remaining -= advance; + } + } } -fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { - let len = 4 + 1 + 4 * 4; +#[cfg(test)] +mod tests { + use super::{PostgresRedoManager, WalRedoManager}; + use crate::repository::Key; + use crate::{config::PageServerConf, walrecord::NeonWalRecord}; + use bytes::Bytes; + use std::num::NonZeroUsize; + use std::str::FromStr; + use utils::{id::TenantId, lsn::Lsn}; + + #[test] + fn short_v14_redo() { + // prettier than embedding the 8192 bytes here though most of it are zeroes + // PRE-MERGE: zstd would cut it to 263 bytes, consider in review? + let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap(); + + let h = RedoHarness::new().unwrap(); + let page = h + .manager + .request_redo( + Key { + field1: 0, + field2: 1663, + field3: 13010, + field4: 1259, + field5: 0, + field6: 0, + }, + Lsn::from_str("0/16E2408").unwrap(), + None, + short_records(), + 14, + ) + .unwrap(); + + assert_eq!(&expected, &*page); + } + + #[test] + fn short_v14_fails_for_wrong_key_but_returns_zero_page() { + let h = RedoHarness::new().unwrap(); + + let page = h + .manager + .request_redo( + Key { + field1: 0, + field2: 1663, + // key should be 13010 + field3: 13130, + field4: 1259, + field5: 0, + field6: 0, + }, + Lsn::from_str("0/16E2408").unwrap(), + None, + short_records(), + 14, + ) + .unwrap(); - buf.put_u8(b'G'); - buf.put_u32(len as u32); - tag.ser_into(buf) - .expect("serialize BufferTag should always succeed"); + // TODO: there will be some stderr printout, which is forwarded to tracing that could + // perhaps be captured as long as it's in the same thread. + assert_eq!(page, crate::ZERO_PAGE); + } + + #[tokio::test] + async fn shutdown_via_taskmgr() { + let h = RedoHarness::new().unwrap(); + + let _page = h + .manager + .request_redo( + Key { + field1: 0, + field2: 1663, + field3: 13010, + field4: 1259, + field5: 0, + field6: 0, + }, + Lsn::from_str("0/16E2408").unwrap(), + None, + short_records(), + 14, + ) + .unwrap(); + + crate::task_mgr::shutdown_tasks( + Some(crate::task_mgr::TaskKind::WalRedo), + Some(h.tenant_id), + None, + ) + .await; + } + + #[tokio::test] + async fn stdout_reads_two_pages() { + // stdout reads 8192 + 1 bytes at time, this test makes sure we see all bytes in that case, + // because cursor can be read for 8193 and 8191 bytes. + let mut buffer = Vec::with_capacity(8192 * 2); + buffer.extend(std::iter::repeat(0).take(8192)); + buffer.push(1); + buffer.extend((2usize..).map(|x| x as u8).take(8192 - 1)); + let buffer = std::io::Cursor::new(buffer); + + let (tx, rx) = tokio::sync::mpsc::channel(2); + let (ret_tx1, ret_rx1) = flume::bounded(1); + let (ret_tx2, ret_rx2) = flume::bounded(1); + + tx.send((ret_tx1, bogus_deadline())).await.unwrap(); + tx.send((ret_tx2, bogus_deadline())).await.unwrap(); + + // important to drop the sender, since we currently wait for the next result_rx before + // trying to read (when we would read 0) + drop(tx); + + super::stdout_task(buffer, rx) + .await + .expect("stdout should return ok since all requests were processed"); + + let page = ret_rx1.recv_async().await.unwrap().unwrap(); + let page = page.slice(..); + assert!(page.iter().all(|&x| x == 0)); + assert_eq!(page.len(), 8192); + + let page = ret_rx2.recv_async().await.unwrap().unwrap(); + let page = page.slice(..); + assert!(page.iter().enumerate().all(|(i, &x)| x == (i + 1) as u8)); + assert_eq!(page.len(), 8192); + } + + #[tokio::test] + async fn stdout_task_fails_on_closed_stdout() { + use super::{stdout_task, StdoutTaskError}; + + let buffer = tokio::io::empty(); + + let (tx, rx) = tokio::sync::mpsc::channel(1); + let (ret_tx, ret_rx) = flume::bounded(1); + + tx.send((ret_tx, bogus_deadline())).await.unwrap(); + + let e = stdout_task(buffer, rx) + .await + .expect_err("task should fail because nothing could be read"); + assert!(matches!(e, StdoutTaskError::StdoutClosed), "{e:?}"); + + ret_rx + .recv_async() + .await + .unwrap() + .expect_err("response should also be error"); + } + + fn bogus_deadline() -> tokio::time::Instant { + tokio::time::Instant::now() + std::time::Duration::from_secs(2) + } + + #[tokio::test] + async fn scale_down_to_zero_breaks() { + use super::{Control, Feedback}; + use futures::future::pending; + use std::num::NonZeroUsize; + use std::ops::ControlFlow::*; + use std::sync::Arc; + use tokio::sync::Notify; + // this was a bug in early implementation, that the control loop would just hang + let feedback = Arc::new(Feedback::new(4)); + let watcher = pending::<()>(); + tokio::pin!(watcher); + + let mut control = Control::default(); + let max_processes = NonZeroUsize::new(1).unwrap(); + let mut initial = Some(()); + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + // because we have the initial, we should always start it first + assert_eq!(Continue(true), ret); + initial.take(); + + let fake_walredo = Arc::new(Notify::new()); + + control.js.spawn({ + let fake_walredo = fake_walredo.to_owned(); + async move { + fake_walredo.notified().await; + } + }); + + // after spawning there's no progress until some activated event comes (in this case, only + // empty, or joinset events) + feedback.notify_not_empty.notify_one(); + feedback.notify_empty.notify_one(); + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + assert_eq!(Continue(false), ret); + + // the downsizing should eventually happen as the empty notifications keep coming + // should_downsize forgets the one permit on returning true + while !feedback.should_downsize() { + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + assert_eq!(Continue(false), ret); + } + + // next round should have this exiting, leading to exiting from the loop + fake_walredo.notify_one(); + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + // this return logic could be complicated, but the js.join_next() will return instantly when + // all tasks have exited + assert_eq!( + Continue(false), + ret, + "first one after exit will be Continue(false)" + ); + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + assert_eq!( + Break(()), + ret, + "Need to break out of the control loop to start waiting a new first item" + ); + } + + #[tokio::test] + async fn scale_up_and_down() { + use super::{Control, Feedback}; + use futures::future::pending; + use std::num::NonZeroUsize; + use std::ops::ControlFlow::*; + use std::sync::Arc; + use tokio::sync::Notify; + + let feedback = Arc::new(Feedback::new(4)); + let watcher = pending::<()>(); + tokio::pin!(watcher); + + let mut control = Control::default(); + let max_processes = NonZeroUsize::new(4).unwrap(); + let mut initial = Some(()); + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + assert_eq!(Continue(true), ret); + initial.take(); + + let mut fakes = Vec::new(); + + loop { + assert!(fakes.len() < max_processes.get()); + let fake_walredo = Arc::new(Notify::new()); + + control.js.spawn({ + let fake_walredo = fake_walredo.to_owned(); + async move { + fake_walredo.notified().await; + } + }); + + fakes.push(fake_walredo); + + if fakes.len() == max_processes.get() { + // since the event is deactivated when max processes is reached, the next iteration + // just sleeps + break; + } + + // then we get a flurry of requests + + feedback.notify_full.notify_one(); + // there are no other notifications in stable overload situation + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + match ret { + // spawn more + Continue(true) => continue, + other => unreachable!("unexpected during scale up: {other:?}"), + } + } + + println!("scaled up"); + + assert_eq!(fakes.len(), max_processes.get()); + + // generate one event since we cannot have these created for us by normal queue reads + feedback.notify_empty.notify_one(); + + loop { + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + if feedback.should_downsize() { + assert_eq!(Continue(false), ret); + let fake = fakes + .pop() + .expect("there should not be more downsizing than there are processes"); + fake.notify_one(); + } else if fakes.is_empty() { + // this is a continue, because the process hasn't yet shown up from js.join_next() + assert_eq!(Continue(false), ret); + break; + } else { + // maybe more signals are needed to downsize + // (a notification is already sent by should_downsize) + } + } + + let ret = control + .iteration(&feedback, &mut watcher, initial.is_some(), &max_processes) + .await; + + // finally we should exit as all processes have exited and scaled down. + assert_eq!(Break(()), ret); + } + + #[allow(clippy::octal_escapes)] + fn short_records() -> Vec<(Lsn, NeonWalRecord)> { + vec![ + ( + Lsn::from_str("0/16A9388").unwrap(), + NeonWalRecord::Postgres { + will_init: true, + rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01") + } + ), + ( + Lsn::from_str("0/16D4080").unwrap(), + NeonWalRecord::Postgres { + will_init: false, + rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0") + } + ) + ] + } + + struct RedoHarness { + // underscored because unused, except for removal at drop + _repo_dir: tempfile::TempDir, + manager: PostgresRedoManager, + tenant_id: TenantId, + // FIXME: this needs a own tokio reactor to use the same api + } + + impl RedoHarness { + fn new() -> anyhow::Result { + let repo_dir = tempfile::tempdir()?; + let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); + let conf = Box::leak(Box::new(conf)); + let tenant_id = TenantId::generate(); + + let manager = + PostgresRedoManager::multiprocess(conf, tenant_id, NonZeroUsize::new(1).unwrap()); + + Ok(RedoHarness { + _repo_dir: repo_dir, + manager, + tenant_id, + }) + } + } } diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 6b3324b7a76b..2a3bd10e4506 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -215,7 +215,8 @@ def prepare_snapshot( # Remove wal-redo temp directory for tenant in (repo_dir / "tenants").glob("*"): - shutil.rmtree(tenant / "wal-redo-datadir.___temp") + for walredo in tenant.glob("wal-redo-datadir*.___temp"): + shutil.rmtree(walredo) # Update paths and ports in config files pageserver_toml = repo_dir / "pageserver.toml" diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index e50a559a4bbf..8e3d4ad822db 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -23,6 +23,7 @@ fail = { version = "0.5", default-features = false, features = ["failpoints"] } futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] } futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] } futures-util = { version = "0.3", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } +getrandom = { version = "0.2", default-features = false, features = ["js", "js-sys", "rdrand", "std", "wasm-bindgen"] } hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] }