Skip to content

Commit

Permalink
Merge pull request #17552 from danhhz/persist_metrics_s3
Browse files Browse the repository at this point in the history
persist: keep metrics per raw s3 call
  • Loading branch information
danhhz committed Feb 7, 2023
2 parents cadacbc + 0114234 commit 06a2a95
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/persist-client/examples/open_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub async fn run(args: Args) -> Result<(), anyhow::Error> {
{
let metrics_registry = metrics_registry.clone();
info!(
"serving internal HTTP server on {}",
"serving internal HTTP server on http://{}/metrics",
args.internal_http_listen_addr
);
mz_ore::task::spawn(
Expand Down
1 change: 1 addition & 0 deletions src/persist/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mz-proto = { path = "../proto" }
openssl = { version = "0.10.43", features = ["vendored"] }
openssl-sys = { version = "0.9.80", features = ["vendored"] }
postgres-openssl = { git = "https://github.com/MaterializeInc/rust-postgres" }
prometheus = { version = "0.13.3", default-features = false }
prost = { version = "0.11.3", features = ["no-recursion-limit"] }
rand = { version = "0.8.5", features = ["small_rng"] }
serde = { version = "1.0.152", features = ["derive"] }
Expand Down
22 changes: 22 additions & 0 deletions src/persist/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use mz_ore::metric;
use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry, UIntGauge};
use prometheus::IntCounterVec;

/// Metrics specific to S3Blob's internal workings.
#[derive(Debug, Clone)]
Expand All @@ -19,11 +20,24 @@ pub struct S3BlobMetrics {
pub(crate) operation_attempt_timeouts: IntCounter,
pub(crate) connect_timeouts: IntCounter,
pub(crate) read_timeouts: IntCounter,
pub(crate) get_part: IntCounter,
pub(crate) set_single: IntCounter,
pub(crate) set_multi_create: IntCounter,
pub(crate) set_multi_part: IntCounter,
pub(crate) set_multi_complete: IntCounter,
pub(crate) delete_head: IntCounter,
pub(crate) delete_object: IntCounter,
pub(crate) list_objects: IntCounter,
}

impl S3BlobMetrics {
/// Returns a new [S3BlobMetrics] instance connected to the given registry.
pub fn new(registry: &MetricsRegistry) -> Self {
let operations: IntCounterVec = registry.register(metric!(
name: "mz_persist_s3_operations",
help: "number of raw s3 calls on behalf of Blob interface methods",
var_labels: ["op"],
));
Self {
operation_timeouts: registry.register(metric!(
name: "mz_persist_s3_operation_timeouts",
Expand All @@ -41,6 +55,14 @@ impl S3BlobMetrics {
name: "mz_persist_s3_read_timeouts",
help: "number of timeouts waiting on first response byte from S3",
)),
get_part: operations.with_label_values(&["get_part"]),
set_single: operations.with_label_values(&["set_single"]),
set_multi_create: operations.with_label_values(&["set_multi_create"]),
set_multi_part: operations.with_label_values(&["set_multi_part"]),
set_multi_complete: operations.with_label_values(&["set_multi_complete"]),
delete_head: operations.with_label_values(&["delete_head"]),
delete_object: operations.with_label_values(&["delete_object"]),
list_objects: operations.with_label_values(&["list_objects"]),
}
}
}
Expand Down
48 changes: 33 additions & 15 deletions src/persist/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::metrics::S3BlobMetrics;
/// Configuration for opening an [S3Blob].
#[derive(Clone, Debug)]
pub struct S3BlobConfig {
metrics: S3BlobMetrics,
client: S3Client,
bucket: String,
prefix: String,
Expand Down Expand Up @@ -148,7 +149,10 @@ impl S3BlobConfig {
}

// NB: we must always use the custom sleep impl if we use the timeout marker values
loader = loader.sleep_impl(MetricsSleep { knobs, metrics });
loader = loader.sleep_impl(MetricsSleep {
knobs,
metrics: metrics.clone(),
});
loader = loader.timeout_config(
TimeoutConfig::builder()
// maximum time allowed for a top-level S3 API call (including internal retries)
Expand All @@ -164,6 +168,7 @@ impl S3BlobConfig {

let client = mz_aws_s3_util::new_client(&loader.load().await);
Ok(S3BlobConfig {
metrics,
client,
bucket,
prefix,
Expand Down Expand Up @@ -272,6 +277,7 @@ impl S3BlobConfig {
/// Implementation of [Blob] backed by S3.
#[derive(Debug)]
pub struct S3Blob {
metrics: S3BlobMetrics,
client: S3Client,
bucket: String,
prefix: String,
Expand All @@ -286,6 +292,7 @@ impl S3Blob {
/// Opens the given location for non-exclusive read-write access.
pub async fn open(config: S3BlobConfig) -> Result<Self, ExternalError> {
let ret = S3Blob {
metrics: config.metrics,
client: config.client,
bucket: config.bucket,
prefix: config.prefix,
Expand Down Expand Up @@ -341,6 +348,7 @@ impl Blob for S3Blob {
// the headers before the full data body has completed. This gives us
// the number of parts. We can then proceed to fetch the body of the
// first request concurrently with the rest of the parts of the object.
self.metrics.get_part.inc();
let object = self
.client
.get_object()
Expand Down Expand Up @@ -400,16 +408,16 @@ impl Blob for S3Blob {
for part_num in 2..=num_parts {
// TODO: Add the key and part number once this can be annotated
// with metadata.
let part_fut = async_runtime.spawn_named(
|| "persist_s3blob_get_header",
let part_fut = async_runtime.spawn_named(|| "persist_s3blob_get_header", {
self.metrics.get_part.inc();
self.client
.get_object()
.bucket(&self.bucket)
.key(&path)
.part_number(part_num)
.send()
.map(move |res| (start_headers.elapsed(), res)),
);
.map(move |res| (start_headers.elapsed(), res))
});
part_futs.push(part_fut);
}

Expand Down Expand Up @@ -537,6 +545,7 @@ impl Blob for S3Blob {
let strippable_root_prefix = format!("{}/", self.prefix);

loop {
self.metrics.list_objects.inc();
let resp = self
.client
.list_objects_v2()
Expand Down Expand Up @@ -600,6 +609,7 @@ impl Blob for S3Blob {
// deletion. This return value is only used for metrics, so it's
// unfortunate, but fine.
let path = self.get_path(key);
self.metrics.delete_head.inc();
let head_res = self
.client
.head_object()
Expand All @@ -612,6 +622,7 @@ impl Blob for S3Blob {
Err(SdkError::ServiceError(err)) if err.err().is_not_found() => return Ok(None),
Err(err) => return Err(ExternalError::from(anyhow!("s3 delete head err: {}", err))),
};
self.metrics.delete_object.inc();
let _ = self
.client
.delete_object()
Expand All @@ -631,6 +642,7 @@ impl S3Blob {

let value_len = value.len();
let part_span = trace_span!("s3set_single", payload_len = value_len);
self.metrics.set_single.inc();
self.client
.put_object()
.bucket(&self.bucket)
Expand All @@ -657,6 +669,7 @@ impl S3Blob {

// Start the multi part request and get an upload id.
trace!("s3 PutObject multi start {}b", value.len());
self.metrics.set_multi_create.inc();
let upload_res = self
.client
.create_multipart_upload()
Expand Down Expand Up @@ -691,16 +704,19 @@ impl S3Blob {
// TODO: Add the key and part number once this can be annotated
// with metadata.
|| "persist_s3blob_put_part",
self.client
.upload_part()
.bucket(&self.bucket)
.key(&path)
.upload_id(upload_id)
.part_number(part_num as i32)
.body(ByteStream::from(value.slice(part_range)))
.send()
.instrument(part_span)
.map(move |res| (start_parts.elapsed(), res)),
{
self.metrics.set_multi_part.inc();
self.client
.upload_part()
.bucket(&self.bucket)
.key(&path)
.upload_id(upload_id)
.part_number(part_num as i32)
.body(ByteStream::from(value.slice(part_range)))
.send()
.instrument(part_span)
.map(move |res| (start_parts.elapsed(), res))
},
);
part_futs.push((part_num, part_fut));
}
Expand Down Expand Up @@ -759,6 +775,7 @@ impl S3Blob {
// abort_multipart_upload work, but it would be complex and affect perf.
// Let's see how far we can get without it.
let start_complete = Instant::now();
self.metrics.set_multi_complete.inc();
self.client
.complete_multipart_upload()
.bucket(&self.bucket)
Expand Down Expand Up @@ -953,6 +970,7 @@ mod tests {
let config = config.clone();
async move {
let config = S3BlobConfig {
metrics: config.metrics.clone(),
client: config.client.clone(),
bucket: config.bucket.clone(),
prefix: format!("{}/s3_blob_impl_test/{}", config.prefix, path),
Expand Down

0 comments on commit 06a2a95

Please sign in to comment.