Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object_store: get_file and put_file #5281

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

troychiu
Copy link

@troychiu troychiu commented Jan 5, 2024

Which issue does this PR close?

Closes #5277.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

@github-actions github-actions bot added the object-store Object Store Interface label Jan 5, 2024
@troychiu
Copy link
Author

troychiu commented Jan 5, 2024

Hi @tustvold,
I am relatively new to Rust, and I am wondering if you could give me some suggestions on this PR. I have implemented the functionalities, but I ran into two issues.

  1. Error handling: For some errors, I added new values to Error enum. I am wondering if it's correct way. Also, I am wondering if I should do the same thing for the multipart writer.
  2. Downloading is slow: I compared this Rust implementation to my Python implementation and found the Rust implementation here is relatively slow. Could you help me check if there is any obvious flaw in my implementation?

Thank you for your time.

@@ -1082,6 +1097,58 @@ fn convert_walkdir_result(
}
}


/// Download a remote object to a local [`File`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these docstrings are the wrong way round.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the reminder.

@@ -1082,6 +1097,58 @@ fn convert_walkdir_result(
}
}


/// Download a remote object to a local [`File`]
pub async fn upload(store: &dyn ObjectStore, location: &Path, opts: PutOptions, file: &mut std::fs::File) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the bounds on file be any looser? It might be nice to have any impl Read, although you'd then need to supply the length or additionally use + Seek (which would be less efficient for finding length than a File would be). You could make a container struct which just contains a reader and its total length, then give easy ways of constructing that from a known length, or impl Read + Seek, or a File (although I think you couldn't use TryFrom without specialization as a File is also Read + Seek).

Could also return the number of bytes written rather than an empty tuple, just in case it's useful to anyone (more useful if the source is an arbitrary readable).

Copy link
Contributor

@clbarnes clbarnes Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose an AsyncRead (and AsyncWrite for below) might be better, so that other functions can run while waiting for that IO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the difficulty with AsyncRead / AsyncWrite is there isn't actually an efficient way to implement them, tokio::fs::File calls spawn_blocking for every call, which is hopelessly inefficient

Copy link
Contributor

@clbarnes clbarnes Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because most file systems don't actually have an async API. I guess it's a balance of that overhead vs having this function block in the middle.

Another possibility would be for these upload and download functions to be replaced with something like

pub async fn copy_between_stores(
    src_store: &dyn ObjectStore, src_location: &Path, get_opts: GetOptions,
    tgt_store: &dyn ObjectStore, tgt_location: &Path, put_opts: PutOptions,
) -> Result<usize> {...}

and if either store is a local file system, that's fine. Then we re-use any optimisations we have elsewhere in the crate and it's more flexible, if a bit more verbose.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filled #5284 for this, I think I would rather keep the scope of this specific PR down, supporting that properly is very subtle and not easy.



/// Upload a local [`File`] to a remote object store
pub async fn download(store: &dyn ObjectStore, location: &Path, opts: GetOptions, file: &mut File) -> Result<()> {
Copy link
Contributor

@clbarnes clbarnes Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, could File be impl (Async)Write? And it could return the number of bytes written.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the intention of using File directly is to handle the specifics that arise from filesystems - e.g. the need for buffering and the lack of async APIs (unless you count platform specific APIs like io_uring)

@clbarnes
Copy link
Contributor

clbarnes commented Jan 5, 2024

2. found the Rust implementation here is relatively slow

This could be in the file writing step; I believe python does some buffering under the hood where rust blocks until the bytes are actually on the file system. Wrap the File in a BufWriter (or whatever its async equivalent is), that might help.

It's also worth noting that the upload and download functions both run in serial (albeit concurrently with other async functions): you wait to read bytes from the file, then you wait to upload them, then you wait to read the next bytes, wait to upload them etc.. I suspect it's possible to have a thread pre-loading a queue with chunks read from the file (which you can limit in size to prevent RAM explosion for large files), and another thread reading from that queue to handle the upload (or vice versa, downloading chunks from the store and writing it) so that you can do your web IO and local IO at the same time.

Comment on lines +1140 to +1147
let mut stream = get_result.into_stream();

while let Some(bytes_result) = stream.next().await {
let bytes = bytes_result?;
file.write_all(&bytes).map_err(|e| Error::UnableToWriteBytesToFile{
source: e.into()
})?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As written this will currently perform blocking IO on the current thread, you probably want to spawn_blocking the IO task, and separate it from the producer with a mpsc channel or something. I can try to get an example of this up later today

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be great if you can provide an example. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I ran out of time today, I'll try to get you something on Monday

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry had a crazy few days, something like this should work (not at all tested)

diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index e985ff070c..0969731f32 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -155,6 +155,11 @@ pub(crate) enum Error {
     InvalidPath {
         path: String,
     },
+
+    #[snafu(display("Unable to download data to file: {}", source))]
+    Download {
+        source: io::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -1093,6 +1098,37 @@ fn convert_walkdir_result(
     }
 }
 
+pub async fn download(
+    store: &dyn ObjectStore,
+    location: &Path,
+    opts: GetOptions,
+    file: &mut File,
+) -> Result<()> {
+    let (mut sender, mut receiver) = tokio::sync::mpsc::channel(2);
+    let mut download = store.get_opts(location, opts).await?.into_stream();
+    let forwarder = async move {
+        while let Some(n) = download.next().await.transpose()? {
+            if sender.send(n).await.is_err() {
+                break;
+            }
+        }
+        Ok::<_, crate::Error>(())
+    };
+
+    let mut captured = file.try_clone().context(DownloadSnafu)?;
+    let writer = maybe_spawn_blocking(move || {
+        Ok(async move {
+            while let Some(b) = receiver.blocking_recv() {
+                captured.write_all(&b).context(DownloadSnafu)?;
+            }
+            Ok::<_, crate::Error>(())
+        })
+    });
+
+    let _ = futures::future::try_join(forwarder, writer).await?;
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Comment on lines +1114 to +1116
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{
source: e
})?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
file.read_to_end(&mut buffer).map_err(|e| Error::UnableToReadBytesFromFile{
source: e
})?;
file.read_to_end(&mut buffer).context(UnableToReadBytesFromFileSnafu)?;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
object-store Object Store Interface
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Object_store: put_file and get_file methods
3 participants