diff --git a/arrow-csv/src/writer.rs b/arrow-csv/src/writer.rs index b64e306b3a1..28a939d88f3 100644 --- a/arrow-csv/src/writer.rs +++ b/arrow-csv/src/writer.rs @@ -194,7 +194,7 @@ impl Writer { } /// A CSV writer builder -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct WriterBuilder { /// Optional column delimiter. Defaults to `b','` delimiter: Option, diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index c0c090cd0f0..c6bb7e85578 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -37,7 +37,6 @@ itertools = "0.10.1" parking_lot = { version = "0.12" } percent-encoding = "2.1" snafu = "0.7" -tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] } tracing = { version = "0.1" } url = "2.2" walkdir = "2" @@ -57,6 +56,12 @@ aws-types = { version = "0.54", optional = true } aws-credential-types = { version = "0.54", optional = true } aws-config = { version = "0.54", optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util", "fs"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] } + [features] cloud = ["serde", "serde_json", "quick-xml", "reqwest", "reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"] azure = ["cloud"] diff --git a/object_store/src/local.rs b/object_store/src/local.rs index f1733f54bab..ac0b02070d5 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -269,7 +269,6 @@ impl Config { impl ObjectStore for LocalFileSystem { async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { let path = self.config.path_to_filesystem(location)?; - maybe_spawn_blocking(move || { let (mut file, suffix) = new_staged_upload(&path)?; let staging_path = staged_upload_path(&path, &suffix); @@ -313,6 +312,53 @@ impl ObjectStore for LocalFileSystem { .await } + async fn append( + &self, + location: &Path, + ) -> Result> { + #[cfg(not(target_arch = "wasm32"))] + // Get the path to the file from the configuration. + let path = self.config.path_to_filesystem(location)?; + loop { + // Create new `OpenOptions`. + let mut options = tokio::fs::OpenOptions::new(); + + // Attempt to open the file with the given options. + match options + .truncate(false) + .append(true) + .create(true) + .open(&path) + .await + { + // If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object. + Ok(file) => return Ok(Box::new(file)), + // If the error is that the file was not found, attempt to create the file and any necessary parent directories. + Err(err) if err.kind() == ErrorKind::NotFound => { + // Get the path to the parent directory of the file. + let parent = path + .parent() + // If the parent directory does not exist, return a `UnableToCreateFileSnafu` error. + .context(UnableToCreateFileSnafu { path: &path, err })?; + + // Create the parent directory and any necessary ancestors. + tokio::fs::create_dir_all(parent) + .await + // If creating the directory fails, return a `UnableToCreateDirSnafu` error. + .context(UnableToCreateDirSnafu { path: parent })?; + // Try again to open the file. + continue; + } + // If any other error occurs, return a `UnableToOpenFile` error. + Err(source) => { + return Err(Error::UnableToOpenFile { source, path }.into()) + } + } + } + #[cfg(target_arch = "wasm32")] + Err(super::Error::NotImplemented) + } + async fn get(&self, location: &Path) -> Result { let path = self.config.path_to_filesystem(location)?; maybe_spawn_blocking(move || { @@ -1305,3 +1351,94 @@ mod tests { integration.list_with_delimiter(Some(&path)).await.unwrap(); } } + +#[cfg(not(target_arch = "wasm32"))] +#[cfg(test)] +mod not_wasm_tests { + use crate::local::LocalFileSystem; + use crate::{ObjectStore, Path}; + use bytes::Bytes; + use tempfile::TempDir; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn creates_dir_if_not_present_append() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let location = Path::from("nested/file/test_file"); + + let data = Bytes::from("arbitrary data"); + let expected_data = data.clone(); + + let mut writer = integration.append(&location).await.unwrap(); + + writer.write_all(data.as_ref()).await.unwrap(); + + let read_data = integration + .get(&location) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(&*read_data, expected_data); + } + + #[tokio::test] + async fn unknown_length_append() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let location = Path::from("some_file"); + + let data = Bytes::from("arbitrary data"); + let expected_data = data.clone(); + let mut writer = integration.append(&location).await.unwrap(); + + writer.write_all(data.as_ref()).await.unwrap(); + + let read_data = integration + .get(&location) + .await + .unwrap() + .bytes() + .await + .unwrap(); + assert_eq!(&*read_data, expected_data); + } + + #[tokio::test] + async fn multiple_append() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let location = Path::from("some_file"); + + let data = vec![ + Bytes::from("arbitrary"), + Bytes::from("data"), + Bytes::from("gnz"), + ]; + + let mut writer = integration.append(&location).await.unwrap(); + for d in &data { + writer.write_all(d).await.unwrap(); + } + + let mut writer = integration.append(&location).await.unwrap(); + for d in &data { + writer.write_all(d).await.unwrap(); + } + + let read_data = integration + .get(&location) + .await + .unwrap() + .bytes() + .await + .unwrap(); + let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz"); + assert_eq!(&*read_data, expected_data); + } +}