diff --git a/CHANGELOG.md b/CHANGELOG.md index 69fdc7336f..caef5cb4c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ Tantivy 0.17 ================================ - Change to non-strict schema. Ignore fields in data which are not defined in schema. Previously this returned an error. #1211 - Facets are necessarily indexed. Existing index with indexed facets should work out of the box. Index without facets that are marked with index: false should be broken (but they were already broken in a sense). (@fulmicoton) #1195 . +- Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224) +- Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225) Tantivy 0.16.2 ================================ diff --git a/src/directory/directory.rs b/src/directory/directory.rs index 36c0172c59..8f564d98ce 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -176,6 +176,12 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static { /// The file may or may not previously exist. fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()>; + /// Sync the directory. + /// + /// This call is required to ensure that newly created files are + /// effectively stored durably. + fn sync_directory(&self) -> io::Result<()>; + /// Acquire a lock in the given directory. /// /// The method is blocking or not depending on the `Lock` object. diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index 3034558efd..baff07934c 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -192,6 +192,7 @@ impl ManagedDirectory { for delete_file in &deleted_files { managed_paths_write.remove(delete_file); } + self.directory.sync_directory()?; save_managed_paths(self.directory.as_mut(), &meta_informations_wlock)?; } @@ -222,9 +223,21 @@ impl ManagedDirectory { .write() .expect("Managed file lock poisoned"); let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned()); - if has_changed { - save_managed_paths(self.directory.as_ref(), &meta_wlock)?; + if !has_changed { + return Ok(()); + } + save_managed_paths(self.directory.as_ref(), &meta_wlock)?; + if meta_wlock.managed_paths.len() >= 1 { + // This is not the first file we add. + // `.managed.json` has been already properly created and we do not need to + // sync its parent directory. + // + // Note we do not try to create the managed.json file, upon the + // creation of the ManagedDirectory because it would prevent + // the usage of a ReadOnly directory. + return Ok(()); } + self.directory.sync_directory()?; Ok(()) } @@ -310,6 +323,11 @@ impl Directory for ManagedDirectory { fn watch(&self, watch_callback: WatchCallback) -> crate::Result { self.directory.watch(watch_callback) } + + fn sync_directory(&self) -> io::Result<()> { + self.directory.sync_directory()?; + Ok(()) + } } impl Clone for ManagedDirectory { diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 51c16ed873..30a245f9e3 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -211,33 +211,6 @@ impl MmapDirectory { self.inner.root_path.join(relative_path) } - /// Sync the root directory. - /// In certain FS, this is required to persistently create - /// a file. - fn sync_directory(&self) -> Result<(), io::Error> { - let mut open_opts = OpenOptions::new(); - - // Linux needs read to be set, otherwise returns EINVAL - // write must not be set, or it fails with EISDIR - open_opts.read(true); - - // On Windows, opening a directory requires FILE_FLAG_BACKUP_SEMANTICS - // and calling sync_all() only works if write access is requested. - #[cfg(windows)] - { - use std::os::windows::fs::OpenOptionsExt; - use winapi::um::winbase; - - open_opts - .write(true) - .custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS); - } - - let fd = open_opts.open(&self.inner.root_path)?; - fd.sync_data()?; - Ok(()) - } - /// Returns some statistical information /// about the Mmap cache. /// @@ -367,22 +340,17 @@ impl Directory for MmapDirectory { /// removed before the file is deleted. fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { let full_path = self.resolve_path(path); - match fs::remove_file(&full_path) { - Ok(_) => self.sync_directory().map_err(|e| DeleteError::IoError { - io_error: e, - filepath: path.to_path_buf(), - }), - Err(e) => { - if e.kind() == io::ErrorKind::NotFound { - Err(DeleteError::FileDoesNotExist(path.to_owned())) - } else { - Err(DeleteError::IoError { - io_error: e, - filepath: path.to_path_buf(), - }) + fs::remove_file(&full_path).map_err(|e| { + if e.kind() == io::ErrorKind::NotFound { + DeleteError::FileDoesNotExist(path.to_owned()) + } else { + DeleteError::IoError { + io_error: e, + filepath: path.to_path_buf(), } } - } + })?; + Ok(()) } fn exists(&self, path: &Path) -> Result { @@ -411,10 +379,14 @@ impl Directory for MmapDirectory { file.flush() .map_err(|io_error| OpenWriteError::wrap_io_error(io_error, path.to_path_buf()))?; - // Apparetntly, on some filesystem syncing the parent - // directory is required. - self.sync_directory() - .map_err(|io_err| OpenWriteError::wrap_io_error(io_err, path.to_path_buf()))?; + // Note we actually do not sync the parent directory here. + // This actually breaks some of the contract of the Directory. + // + // A newly created file, may, in some case, be created and even flushed to disk. + // and then lost... + // + // This event is not harmful in tantivy, as files only need to be durably + // saved upon commit, and we sync the directory in the atomic_write. let writer = SafeFileWriter::new(file); Ok(BufWriter::new(Box::new(writer))) @@ -470,6 +442,30 @@ impl Directory for MmapDirectory { fn watch(&self, watch_callback: WatchCallback) -> crate::Result { Ok(self.inner.watch(watch_callback)) } + + fn sync_directory(&self) -> Result<(), io::Error> { + let mut open_opts = OpenOptions::new(); + + // Linux needs read to be set, otherwise returns EINVAL + // write must not be set, or it fails with EISDIR + open_opts.read(true); + + // On Windows, opening a directory requires FILE_FLAG_BACKUP_SEMANTICS + // and calling sync_all() only works if write access is requested. + #[cfg(windows)] + { + use std::os::windows::fs::OpenOptionsExt; + use winapi::um::winbase; + + open_opts + .write(true) + .custom_flags(winbase::FILE_FLAG_BACKUP_SEMANTICS); + } + + let fd = open_opts.open(&self.inner.root_path)?; + fd.sync_all()?; + Ok(()) + } } #[cfg(test)] diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index a04c6f7478..3d33fd8d03 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -225,6 +225,10 @@ impl Directory for RamDirectory { fn watch(&self, watch_callback: WatchCallback) -> crate::Result { Ok(self.fs.write().unwrap().watch(watch_callback)) } + + fn sync_directory(&self) -> io::Result<()> { + Ok(()) + } } #[cfg(test)] diff --git a/src/indexer/segment_updater.rs b/src/indexer/segment_updater.rs index f7f7518db7..cae64d796f 100644 --- a/src/indexer/segment_updater.rs +++ b/src/indexer/segment_updater.rs @@ -61,7 +61,9 @@ pub fn save_new_metas( payload: None, }, directory, - ) + )?; + directory.sync_directory()?; + Ok(()) } /// Save the index meta file. @@ -82,6 +84,7 @@ fn save_metas(metas: &IndexMeta, directory: &dyn Directory) -> crate::Result<()> io::ErrorKind::Other, msg.unwrap_or_else(|| "Undefined".to_string()) )))); + directory.sync_directory()?; directory.atomic_write(&META_FILEPATH, &buffer[..])?; debug!("Saved metas {:?}", serde_json::to_string_pretty(&metas)); Ok(()) diff --git a/src/store/compression_lz4_block.rs b/src/store/compression_lz4_block.rs index d2c8e3f2e1..532eed2fa9 100644 --- a/src/store/compression_lz4_block.rs +++ b/src/store/compression_lz4_block.rs @@ -1,14 +1,15 @@ use std::io::{self}; use core::convert::TryInto; -use std::mem; use lz4_flex::{compress_into, decompress_into}; +use std::mem; #[inline] #[allow(clippy::uninit_vec)] pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { compressed.clear(); - let maximum_ouput_size = mem::size_of::() + lz4_flex::block::get_maximum_output_size(uncompressed.len()); + let maximum_ouput_size = + mem::size_of::() + lz4_flex::block::get_maximum_output_size(uncompressed.len()); compressed.reserve(maximum_ouput_size); unsafe { compressed.set_len(maximum_ouput_size);