Skip to content

Commit

Permalink
WIP: Process received GetPage requests in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
hlinnaka authored and koivunej committed Dec 7, 2022
1 parent e4e6174 commit 9eec603
Showing 1 changed file with 104 additions and 64 deletions.
168 changes: 104 additions & 64 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use anyhow::{bail, ensure, Context, Result};
use bytes::Buf;
use bytes::Bytes;
use futures::stream::FuturesOrdered;
use futures::{Stream, StreamExt};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
Expand All @@ -26,6 +27,7 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use tokio::pin;
use tokio_util::io::StreamReader;
use tokio_util::io::SyncIoBridge;
Expand Down Expand Up @@ -56,6 +58,9 @@ use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;

/// Number of requests to process in parallel, from a single connection
const MAX_INFLIGHT_REQUESTS: usize = 4;

fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
async_stream::try_stream! {
loop {
Expand Down Expand Up @@ -226,6 +231,13 @@ struct PageRequestMetrics {
get_db_size: metrics::Histogram,
}

pub enum RequestType {
Exists,
Nblocks,
GetPage,
DbSize,
}

impl PageRequestMetrics {
fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_id.to_string();
Expand Down Expand Up @@ -300,67 +312,100 @@ impl PageServerHandler {

let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id);

//
// Main loop to handle the stream of requests
//
// We process multiple requests in parallel, by spawning a new Task for each
// incoming request.
let mut inprogress_requests = FuturesOrdered::new();
loop {
let msg = tokio::select! {
tokio::select! {
biased;

// If we were requested to shut down, stop
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
break;
}

msg = pgb.read_message() => { msg }
};

let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};
// When a task completes, send the response to the client
completed_task = inprogress_requests.next(), if !inprogress_requests.is_empty() => {
let response: Bytes;
let request_type: RequestType;
let elapsed_sec: f64;
(response, request_type, elapsed_sec) = completed_task.unwrap()?;
pgb.write_message(&BeMessage::CopyData(&response))?;
pgb.flush().await?;

trace!("query: {copy_data_bytes:?}");
match request_type {
RequestType::Exists => metrics.get_rel_exists.observe(elapsed_sec),
RequestType::Nblocks => metrics.get_rel_size.observe(elapsed_sec),
RequestType::GetPage => metrics.get_page_at_lsn.observe(elapsed_sec),
RequestType::DbSize => metrics.get_db_size.observe(elapsed_sec),
}

// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}
continue;
}

let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// When a new request arrives, spawn a task to process it.
// If we already have MAX_INFLIGHT_REQUESTS requests in-progress, however,
// don't start new ones.
msg = pgb.read_message(), if inprogress_requests.len() < MAX_INFLIGHT_REQUESTS => {
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(m) => {
bail!("unexpected message: {m:?} during COPY");
}
None => break, // client disconnected
};

let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.get_rel_exists.start_timer();
self.handle_get_rel_exists_request(&timeline, &req).await
}
PagestreamFeMessage::Nblocks(req) => {
let _timer = metrics.get_rel_size.start_timer();
self.handle_get_nblocks_request(&timeline, &req).await
}
PagestreamFeMessage::GetPage(req) => {
let _timer = metrics.get_page_at_lsn.start_timer();
self.handle_get_page_at_lsn_request(&timeline, &req).await
}
PagestreamFeMessage::DbSize(req) => {
let _timer = metrics.get_db_size.start_timer();
self.handle_db_size_request(&timeline, &req).await
trace!("query: {copy_data_bytes:?}");

// Trace request if needed
if let Some(t) = tracer.as_mut() {
t.trace(&copy_data_bytes)
}

let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;

let timeline = Arc::clone(&timeline);
let conf = self.conf;
let task = async move {
let start_time = Instant::now();
let (response, request_type) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
(Self::handle_get_rel_exists_request(&timeline, &req).await,
RequestType::Exists)
}
PagestreamFeMessage::Nblocks(req) => {
(Self::handle_get_nblocks_request(&timeline, &req).await,
RequestType::Nblocks)
}
PagestreamFeMessage::GetPage(req) => {
(Self::handle_get_page_at_lsn_request(conf, &timeline, &req).await,
RequestType::GetPage)
}
PagestreamFeMessage::DbSize(req) => {
(Self::handle_db_size_request(&timeline, &req).await,
RequestType::DbSize)
}
};

let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});
let response: Bytes = response.serialize();
(response, request_type, start_time.elapsed().as_secs_f64())
};
inprogress_requests.push_back(tokio::spawn(task));
continue;
}
};

let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
});

pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
pgb.flush().await?;
}
Ok(())
}
Expand Down Expand Up @@ -532,9 +577,8 @@ impl PageServerHandler {
Ok(lsn)
}

#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
req: &PagestreamExistsRequest,
) -> Result<PagestreamBeMessage> {
Expand All @@ -549,9 +593,8 @@ impl PageServerHandler {
}))
}

#[instrument(skip(self, timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
req: &PagestreamNblocksRequest,
) -> Result<PagestreamBeMessage> {
Expand All @@ -566,9 +609,8 @@ impl PageServerHandler {
}))
}

#[instrument(skip(self, timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
#[instrument(skip(timeline, req), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
req: &PagestreamDbSizeRequest,
) -> Result<PagestreamBeMessage> {
Expand All @@ -586,11 +628,11 @@ impl PageServerHandler {
}))
}

#[instrument(skip(self, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
req: &PagestreamGetPageRequest,
#[instrument(skip(conf, timeline, req), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request<'a>(
conf: &'static PageServerConf,
timeline: &'a Timeline,
req: &'a PagestreamGetPageRequest,
) -> Result<PagestreamBeMessage> {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)
Expand All @@ -607,17 +649,16 @@ impl PageServerHandler {
// FIXME: this profiling now happens at different place than it used to. The
// current profiling is based on a thread-local variable, so it doesn't work
// across awaits
let _profiling_guard = profpoint_start(self.conf, ProfilingConfig::PageRequests);
let _profiling_guard = profpoint_start(conf, ProfilingConfig::PageRequests);
let page = timeline.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest)?;

Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
}

#[instrument(skip(self, pgb))]
#[instrument(skip(pgb))]
async fn handle_basebackup_request(
&self,
pgb: &mut PostgresBackend,
tenant_id: TenantId,
timeline_id: TimelineId,
Expand Down Expand Up @@ -750,8 +791,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
};

// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false)
.await?;
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false).await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// return pair of prev_lsn and last_lsn
Expand Down Expand Up @@ -810,7 +850,7 @@ impl postgres_backend_async::Handler for PageServerHandler {
self.check_permission(Some(tenant_id))?;

// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
Self::handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true)
.await?;
pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {
Expand Down

0 comments on commit 9eec603

Please sign in to comment.