Skip to content

Commit

Permalink
[ObjectStore] Add append API impl for LocalFileSystem (apache#3824)
Browse files Browse the repository at this point in the history
* Append Push API

* wasm is not enabled.
  • Loading branch information
metesynnada committed Mar 11, 2023
1 parent c96274a commit 9ce0ebb
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 3 deletions.
2 changes: 1 addition & 1 deletion arrow-csv/src/writer.rs
Expand Up @@ -194,7 +194,7 @@ impl<W: Write> Writer<W> {
}

/// A CSV writer builder
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct WriterBuilder {
/// Optional column delimiter. Defaults to `b','`
delimiter: Option<u8>,
Expand Down
7 changes: 6 additions & 1 deletion object_store/Cargo.toml
Expand Up @@ -37,7 +37,6 @@ itertools = "0.10.1"
parking_lot = { version = "0.12" }
percent-encoding = "2.1"
snafu = "0.7"
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }
tracing = { version = "0.1" }
url = "2.2"
walkdir = "2"
Expand All @@ -57,6 +56,12 @@ aws-types = { version = "0.54", optional = true }
aws-credential-types = { version = "0.54", optional = true }
aws-config = { version = "0.54", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util", "fs"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] }

[features]
cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
Expand Down
139 changes: 138 additions & 1 deletion object_store/src/local.rs
Expand Up @@ -269,7 +269,6 @@ impl Config {
impl ObjectStore for LocalFileSystem {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
let path = self.config.path_to_filesystem(location)?;

maybe_spawn_blocking(move || {
let (mut file, suffix) = new_staged_upload(&path)?;
let staging_path = staged_upload_path(&path, &suffix);
Expand Down Expand Up @@ -313,6 +312,53 @@ impl ObjectStore for LocalFileSystem {
.await
}

async fn append(
&self,
location: &Path,
) -> Result<Box<dyn AsyncWrite + Unpin + Send>> {
#[cfg(not(target_arch = "wasm32"))]
// Get the path to the file from the configuration.
let path = self.config.path_to_filesystem(location)?;
loop {
// Create new `OpenOptions`.
let mut options = tokio::fs::OpenOptions::new();

// Attempt to open the file with the given options.
match options
.truncate(false)
.append(true)
.create(true)
.open(&path)
.await
{
// If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object.
Ok(file) => return Ok(Box::new(file)),
// If the error is that the file was not found, attempt to create the file and any necessary parent directories.
Err(err) if err.kind() == ErrorKind::NotFound => {
// Get the path to the parent directory of the file.
let parent = path
.parent()
// If the parent directory does not exist, return a `UnableToCreateFileSnafu` error.
.context(UnableToCreateFileSnafu { path: &path, err })?;

// Create the parent directory and any necessary ancestors.
tokio::fs::create_dir_all(parent)
.await
// If creating the directory fails, return a `UnableToCreateDirSnafu` error.
.context(UnableToCreateDirSnafu { path: parent })?;
// Try again to open the file.
continue;
}
// If any other error occurs, return a `UnableToOpenFile` error.
Err(source) => {
return Err(Error::UnableToOpenFile { source, path }.into())
}
}
}
#[cfg(target_arch = "wasm32")]
Err(super::Error::NotImplemented)
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
Expand Down Expand Up @@ -1305,3 +1351,94 @@ mod tests {
integration.list_with_delimiter(Some(&path)).await.unwrap();
}
}

#[cfg(not(target_arch = "wasm32"))]
#[cfg(test)]
mod not_wasm_tests {
use crate::local::LocalFileSystem;
use crate::{ObjectStore, Path};
use bytes::Bytes;
use tempfile::TempDir;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn creates_dir_if_not_present_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("nested/file/test_file");

let data = Bytes::from("arbitrary data");
let expected_data = data.clone();

let mut writer = integration.append(&location).await.unwrap();

writer.write_all(data.as_ref()).await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn unknown_length_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("some_file");

let data = Bytes::from("arbitrary data");
let expected_data = data.clone();
let mut writer = integration.append(&location).await.unwrap();

writer.write_all(data.as_ref()).await.unwrap();

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
assert_eq!(&*read_data, expected_data);
}

#[tokio::test]
async fn multiple_append() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let location = Path::from("some_file");

let data = vec![
Bytes::from("arbitrary"),
Bytes::from("data"),
Bytes::from("gnz"),
];

let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}

let mut writer = integration.append(&location).await.unwrap();
for d in &data {
writer.write_all(d).await.unwrap();
}

let read_data = integration
.get(&location)
.await
.unwrap()
.bytes()
.await
.unwrap();
let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz");
assert_eq!(&*read_data, expected_data);
}
}

0 comments on commit 9ce0ebb

Please sign in to comment.