From 6c3f9a299ad10a66160563f7580a1062c2e8af7e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Sun, 31 Jul 2022 21:44:32 +0100 Subject: [PATCH] Add LimitStore (#2175) (#2242) * Add LimitStore (#2175) * Review feedback * Fix test --- object_store/src/aws.rs | 1 + object_store/src/lib.rs | 1 + object_store/src/limit.rs | 263 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 265 insertions(+) create mode 100644 object_store/src/limit.rs diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 89a2185128b..cedd4651e54 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -684,6 +684,7 @@ impl AmazonS3Builder { /// Sets the maximum number of concurrent outstanding /// connectons. Default is `16`. + #[deprecated(note = "use LimitStore instead")] pub fn with_max_connections(mut self, max_connections: NonZeroUsize) -> Self { self.max_connections = max_connections; self diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 54d28273fa9..33e8452d064 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -45,6 +45,7 @@ pub mod aws; pub mod azure; #[cfg(feature = "gcp")] pub mod gcp; +pub mod limit; pub mod local; pub mod memory; pub mod path; diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs new file mode 100644 index 00000000000..fd21ccb58d7 --- /dev/null +++ b/object_store/src/limit.rs @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! An object store that limits the maximum concurrency of the wrapped implementation + +use crate::{ + BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result, + StreamExt, +}; +use async_trait::async_trait; +use bytes::Bytes; +use futures::Stream; +use std::io::{Error, IoSlice}; +use std::ops::Range; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::io::AsyncWrite; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; + +/// Store wrapper that wraps an inner store and limits the maximum number of concurrent +/// object store operations. Where each call to an [`ObjectStore`] member function is +/// considered a single operation, even if it may result in more than one network call +/// +/// ``` +/// # use object_store::memory::InMemory; +/// # use object_store::limit::LimitStore; +/// +/// // Create an in-memory `ObjectStore` limited to 20 concurrent requests +/// let store = LimitStore::new(InMemory::new(), 20); +/// ``` +/// +#[derive(Debug)] +pub struct LimitStore { + inner: T, + max_requests: usize, + semaphore: Arc, +} + +impl LimitStore { + /// Create new limit store that will limit the maximum + /// number of outstanding concurrent requests to + /// `max_requests` + pub fn new(inner: T, max_requests: usize) -> Self { + Self { + inner, + max_requests, + semaphore: Arc::new(Semaphore::new(max_requests)), + } + } +} + +impl std::fmt::Display for LimitStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "LimitStore({}, {})", self.max_requests, self.inner) + } +} + +#[async_trait] +impl ObjectStore for LimitStore { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.put(location, bytes).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> Result<(MultipartId, Box)> { + let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); + let (id, write) = self.inner.put_multipart(location).await?; + Ok((id, Box::new(PermitWrapper::new(write, permit)))) + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.abort_multipart(location, multipart_id).await + } + + async fn get(&self, location: &Path) -> Result { + let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); + match self.inner.get(location).await? { + r @ GetResult::File(_, _) => Ok(r), + GetResult::Stream(s) => { + Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed())) + } + } + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &Path) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.delete(location).await + } + + async fn list( + &self, + prefix: Option<&Path>, + ) -> Result>> { + let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); + let s = self.inner.list(prefix).await?; + Ok(PermitWrapper::new(s, permit).boxed()) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.copy(from, to).await + } + + async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.rename(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.copy_if_not_exists(from, to).await + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.rename_if_not_exists(from, to).await + } +} + +/// Combines an [`OwnedSemaphorePermit`] with some other type +struct PermitWrapper { + inner: T, + #[allow(dead_code)] + permit: OwnedSemaphorePermit, +} + +impl PermitWrapper { + fn new(inner: T, permit: OwnedSemaphorePermit) -> Self { + Self { inner, permit } + } +} + +impl Stream for PermitWrapper { + type Item = T::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl AsyncWrite for PermitWrapper { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +#[cfg(test)] +mod tests { + use crate::limit::LimitStore; + use crate::memory::InMemory; + use crate::tests::{ + list_uses_directories_correctly, list_with_delimiter, put_get_delete_list, + rename_and_copy, stream_get, + }; + use crate::ObjectStore; + use std::time::Duration; + use tokio::time::timeout; + + #[tokio::test] + async fn limit_test() { + let max_requests = 10; + let memory = InMemory::new(); + let integration = LimitStore::new(memory, max_requests); + + put_get_delete_list(&integration).await.unwrap(); + list_uses_directories_correctly(&integration).await.unwrap(); + list_with_delimiter(&integration).await.unwrap(); + rename_and_copy(&integration).await.unwrap(); + stream_get(&integration).await.unwrap(); + + let mut streams = Vec::with_capacity(max_requests); + for _ in 0..max_requests { + let stream = integration.list(None).await.unwrap(); + streams.push(stream); + } + + let t = Duration::from_millis(20); + + // Expect to not be able to make another request + assert!(timeout(t, integration.list(None)).await.is_err()); + + // Drop one of the streams + streams.pop(); + + // Can now make another request + integration.list(None).await.unwrap(); + } +}