From aaa22ad225fe988072445e69258e1d85323c5c8a Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Wed, 18 May 2022 11:13:15 +1000 Subject: [PATCH 1/8] Make block size configurable to allow for better compression ratios on large documents --- src/core/index_meta.rs | 21 ++++++++++++++++++++- src/indexer/segment_serializer.rs | 3 ++- src/indexer/segment_writer.rs | 3 ++- src/store/mod.rs | 19 +++++++++++-------- src/store/reader.rs | 4 +++- src/store/writer.rs | 10 +++++----- 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 8291cf0322..a8f9c50f89 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -239,7 +239,7 @@ impl InnerSegmentMeta { /// /// Contains settings which are applied on the whole /// index, like presort documents. -#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct IndexSettings { /// Sorts the documents by information /// provided in `IndexSortByField` @@ -248,7 +248,26 @@ pub struct IndexSettings { /// The `Compressor` used to compress the doc store. #[serde(default)] pub docstore_compression: Compressor, + #[serde(default = "default_docstore_blocksize")] + /// The size of each block that will be compressed and written to disk + pub docstore_blocksize: usize, +} + +/// Must be a function to be compatible with serde defaults +fn default_docstore_blocksize() -> usize { + 16_384 +} + +impl Default for IndexSettings { + fn default() -> Self { + Self { + sort_by_field: None, + docstore_compression: Compressor::default(), + docstore_blocksize: default_docstore_blocksize(), + } + } } + /// Settings to presort the documents in an index /// /// Presorting documents can greatly performance diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 923b5b6dd1..554503e668 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -39,9 +39,10 @@ impl SegmentSerializer { let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; let compressor = segment.index().settings().docstore_compression; + let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor), + store_writer: StoreWriter::new(store_write, compressor, blocksize), fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 537b4f502a..c1ae1c6e88 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -372,9 +372,10 @@ fn remap_and_write( .segment_mut() .open_write(SegmentComponent::Store)?; let compressor = serializer.segment().index().settings().docstore_compression; + let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor), + StoreWriter::new(store_write, compressor, block_size), ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/mod.rs b/src/store/mod.rs index b378d1db5d..13f6d76b3a 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -69,10 +69,13 @@ pub mod tests { sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \ mollit anim id est laborum."; + const BLOCK_SIZE: usize = 16_384; + pub fn write_lorem_ipsum_store( writer: WritePtr, num_docs: usize, compressor: Compressor, + blocksize: usize, ) -> Schema { let mut schema_builder = Schema::builder(); let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); @@ -80,7 +83,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); @@ -103,7 +106,7 @@ pub mod tests { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4); + let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; @@ -139,11 +142,11 @@ pub mod tests { Ok(()) } - fn test_store(compressor: Compressor) -> crate::Result<()> { + fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor); + let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; @@ -169,22 +172,22 @@ pub mod tests { #[test] fn test_store_noop() -> crate::Result<()> { - test_store(Compressor::None) + test_store(Compressor::None, BLOCK_SIZE) } #[cfg(feature = "lz4-compression")] #[test] fn test_store_lz4_block() -> crate::Result<()> { - test_store(Compressor::Lz4) + test_store(Compressor::Lz4, BLOCK_SIZE) } #[cfg(feature = "snappy-compression")] #[test] fn test_store_snap() -> crate::Result<()> { - test_store(Compressor::Snappy) + test_store(Compressor::Snappy, BLOCK_SIZE) } #[cfg(feature = "brotli-compression")] #[test] fn test_store_brotli() -> crate::Result<()> { - test_store(Compressor::Brotli) + test_store(Compressor::Brotli, BLOCK_SIZE) } #[test] diff --git a/src/store/reader.rs b/src/store/reader.rs index f3dadda2a4..830396b083 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -304,6 +304,8 @@ mod tests { use crate::store::tests::write_lorem_ipsum_store; use crate::Directory; + const BLOCK_SIZE: usize = 16_384; + fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> { doc.get_first(*field).and_then(|f| f.as_text()) } @@ -313,7 +315,7 @@ mod tests { let directory = RamDirectory::create(); let path = Path::new("store"); let writer = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(writer, 500, Compressor::default()); + let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; diff --git a/src/store/writer.rs b/src/store/writer.rs index 0efdb0fc61..96864c75bb 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -11,8 +11,6 @@ use crate::schema::Document; use crate::store::index::Checkpoint; use crate::DocId; -const BLOCK_SIZE: usize = 16_384; - /// Write tantivy's [`Store`](./index.html) /// /// Contrary to the other components of `tantivy`, @@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384; /// The skip list index on the other hand, is built in memory. pub struct StoreWriter { compressor: Compressor, + block_size: usize, doc: DocId, first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, @@ -35,9 +34,10 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter { + pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { StoreWriter { compressor, + block_size, doc: 0, first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), @@ -65,7 +65,7 @@ impl StoreWriter { VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block.write_all(serialized_document)?; self.doc += 1; - if self.current_block.len() > BLOCK_SIZE { + if self.current_block.len() > self.block_size { self.write_and_compress_block()?; } Ok(()) @@ -86,7 +86,7 @@ impl StoreWriter { self.current_block .write_all(&self.intermediary_buffer[..])?; self.doc += 1; - if self.current_block.len() > BLOCK_SIZE { + if self.current_block.len() > self.block_size { self.write_and_compress_block()?; } Ok(()) From 03040ed81d6d88f630baf49079624f18aaef2d1c Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Wed, 18 May 2022 14:04:43 +1000 Subject: [PATCH 2/8] Add Zstd compression support --- Cargo.toml | 2 ++ src/core/index_meta.rs | 2 +- src/store/compression_zstd_block.rs | 54 +++++++++++++++++++++++++++++ src/store/compressors.rs | 27 +++++++++++++++ src/store/mod.rs | 9 +++++ 5 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 src/store/compression_zstd_block.rs diff --git a/Cargo.toml b/Cargo.toml index 4699343d33..bec5617d8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tantivy-fst = "0.3.0" memmap2 = { version = "0.5.3", optional = true } lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true } brotli = { version = "3.3.4", optional = true } +zstd = { version = "0.11", optional = true } snap = { version = "1.0.5", optional = true } tempfile = { version = "3.3.0", optional = true } log = "0.4.16" @@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"] brotli-compression = ["brotli"] lz4-compression = ["lz4_flex"] snappy-compression = ["snap"] +zstd-compression = ["zstd"] failpoints = ["fail/failpoints"] unstable = [] # useful for benches. diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index a8f9c50f89..1343c8d51b 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -420,7 +420,7 @@ mod tests { let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); assert_eq!( json, - r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"# + r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"# ); let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap(); diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs new file mode 100644 index 0000000000..ff18ff95f5 --- /dev/null +++ b/src/store/compression_zstd_block.rs @@ -0,0 +1,54 @@ +use std::io::{self, Read, Write}; + +use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; +use zstd::DEFAULT_COMPRESSION_LEVEL; + +const USIZE_SIZE: usize = std::mem::size_of::(); + +#[inline] +pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { + compressed.clear(); + + let max_size: usize = uncompressed.len() + USIZE_SIZE; + compressed.resize(max_size, 0); + + let compressed_size = compress_to_buffer( + uncompressed, + &mut compressed[USIZE_SIZE..], + DEFAULT_COMPRESSION_LEVEL, + ) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + + compressed[0..USIZE_SIZE].copy_from_slice(&uncompressed.len().to_le_bytes()); + + compressed.resize(compressed_size + USIZE_SIZE, 0); + + Ok(()) +} + +#[inline] +pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { + decompressed.clear(); + + let uncompressed_size_bytes: &[u8; USIZE_SIZE] = compressed + .get(..USIZE_SIZE) + .ok_or(io::ErrorKind::InvalidData)? + .try_into() + .unwrap(); + + let uncompressed_size = usize::from_le_bytes(*uncompressed_size_bytes); + + decompressed.resize(uncompressed_size, 0); + decompressed.resize(decompressed.capacity(), 0); + let decompressed_size = decompress_to_buffer(&compressed[USIZE_SIZE..], decompressed) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + + if decompressed_size != uncompressed_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "doc store block not completely decompressed, data corruption".to_string(), + )); + } + + Ok(()) +} diff --git a/src/store/compressors.rs b/src/store/compressors.rs index f386d286e8..2e3ae5293e 100644 --- a/src/store/compressors.rs +++ b/src/store/compressors.rs @@ -26,6 +26,9 @@ pub enum Compressor { #[serde(rename = "snappy")] /// Use the snap compressor Snappy, + #[serde(rename = "zstd")] + /// Use the zstd compressor + Zstd, } impl Default for Compressor { @@ -36,6 +39,8 @@ impl Default for Compressor { Compressor::Brotli } else if cfg!(feature = "snappy-compression") { Compressor::Snappy + } else if cfg!(feature = "zstd-compression") { + Compressor::Zstd } else { Compressor::None } @@ -49,6 +54,7 @@ impl Compressor { 1 => Compressor::Lz4, 2 => Compressor::Brotli, 3 => Compressor::Snappy, + 4 => Compressor::Zstd, _ => panic!("unknown compressor id {:?}", id), } } @@ -58,6 +64,7 @@ impl Compressor { Self::Lz4 => 1, Self::Brotli => 2, Self::Snappy => 3, + Self::Zstd => 4, } } #[inline] @@ -98,6 +105,16 @@ impl Compressor { panic!("snappy-compression feature flag not activated"); } } + Self::Zstd => { + #[cfg(feature = "zstd-compression")] + { + super::compression_zstd_block::compress(uncompressed, compressed) + } + #[cfg(not(feature = "zstd-compression"))] + { + panic!("zstd-compression feature flag not activated"); + } + } } } @@ -143,6 +160,16 @@ impl Compressor { panic!("snappy-compression feature flag not activated"); } } + Self::Zstd => { + #[cfg(feature = "zstd-compression")] + { + super::compression_zstd_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "zstd-compression"))] + { + panic!("zstd-compression feature flag not activated"); + } + } } } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 13f6d76b3a..907cb8637f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -50,6 +50,9 @@ mod compression_brotli; #[cfg(feature = "snappy-compression")] mod compression_snap; +#[cfg(feature = "zstd-compression")] +mod compression_zstd_block; + #[cfg(test)] pub mod tests { @@ -190,6 +193,12 @@ pub mod tests { test_store(Compressor::Brotli, BLOCK_SIZE) } + #[cfg(feature = "zstd-compression")] + #[test] + fn test_store_zstd() -> crate::Result<()> { + test_store(Compressor::Zstd, BLOCK_SIZE) + } + #[test] fn test_store_with_delete() -> crate::Result<()> { let mut schema_builder = schema::Schema::builder(); From d4e5b484378f80599d7e22cc3dc461ed72f2ca7d Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Wed, 18 May 2022 19:37:28 +1000 Subject: [PATCH 3/8] Apply feedback - standardise on u64 and fix correct compression bounds --- src/store/compression_zstd_block.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs index ff18ff95f5..f4ac7f7b48 100644 --- a/src/store/compression_zstd_block.rs +++ b/src/store/compression_zstd_block.rs @@ -3,25 +3,25 @@ use std::io::{self, Read, Write}; use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; use zstd::DEFAULT_COMPRESSION_LEVEL; -const USIZE_SIZE: usize = std::mem::size_of::(); - #[inline] pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { compressed.clear(); - let max_size: usize = uncompressed.len() + USIZE_SIZE; + let count_size = std::mem::size_of::(); + + let max_size: usize = zstd::compress_bound(uncompressed.len()) + count_size; + compressed.resize(max_size, 0); let compressed_size = compress_to_buffer( uncompressed, - &mut compressed[USIZE_SIZE..], + &mut compressed[count_size..], DEFAULT_COMPRESSION_LEVEL, ) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; - compressed[0..USIZE_SIZE].copy_from_slice(&uncompressed.len().to_le_bytes()); - - compressed.resize(compressed_size + USIZE_SIZE, 0); + compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u64).to_le_bytes()); + compressed.resize(compressed_size + count_size, 0); Ok(()) } @@ -30,17 +30,18 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { decompressed.clear(); - let uncompressed_size_bytes: &[u8; USIZE_SIZE] = compressed - .get(..USIZE_SIZE) + let count_size = std::mem::size_of::(); + + let uncompressed_size_bytes: &[u8; count_size] = compressed + .get(..count_size) .ok_or(io::ErrorKind::InvalidData)? .try_into() .unwrap(); - let uncompressed_size = usize::from_le_bytes(*uncompressed_size_bytes); + let uncompressed_size = u64::from_le_bytes(*uncompressed_size_bytes); decompressed.resize(uncompressed_size, 0); - decompressed.resize(decompressed.capacity(), 0); - let decompressed_size = decompress_to_buffer(&compressed[USIZE_SIZE..], decompressed) + let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; if decompressed_size != uncompressed_size { From 152e8238d7eddf9e753609accb2dd44cacdd2825 Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Wed, 18 May 2022 19:49:10 +1000 Subject: [PATCH 4/8] Fix silly errors from running tests without feature flag --- src/store/compression_zstd_block.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs index f4ac7f7b48..23ac6b40f7 100644 --- a/src/store/compression_zstd_block.rs +++ b/src/store/compression_zstd_block.rs @@ -9,7 +9,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> let count_size = std::mem::size_of::(); - let max_size: usize = zstd::compress_bound(uncompressed.len()) + count_size; + let max_size: usize = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; compressed.resize(max_size, 0); @@ -32,7 +32,7 @@ pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<( let count_size = std::mem::size_of::(); - let uncompressed_size_bytes: &[u8; count_size] = compressed + let uncompressed_size_bytes: &[u8; 8] = compressed .get(..count_size) .ok_or(io::ErrorKind::InvalidData)? .try_into() @@ -40,11 +40,11 @@ pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<( let uncompressed_size = u64::from_le_bytes(*uncompressed_size_bytes); - decompressed.resize(uncompressed_size, 0); + decompressed.resize(uncompressed_size as usize, 0); let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; - if decompressed_size != uncompressed_size { + if decompressed_size != uncompressed_size as usize { return Err(io::Error::new( io::ErrorKind::InvalidData, "doc store block not completely decompressed, data corruption".to_string(), From 0759bf94486b376ff30ba09364a684248e8e4fd6 Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Wed, 18 May 2022 20:31:22 +1000 Subject: [PATCH 5/8] Cleanup zstd structure and serialise to u32 in line with lz4 --- src/store/compression_zstd_block.rs | 33 +++++++++++++---------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs index 23ac6b40f7..f2ae66cc95 100644 --- a/src/store/compression_zstd_block.rs +++ b/src/store/compression_zstd_block.rs @@ -5,12 +5,10 @@ use zstd::DEFAULT_COMPRESSION_LEVEL; #[inline] pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { - compressed.clear(); - - let count_size = std::mem::size_of::(); - - let max_size: usize = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; + let count_size = std::mem::size_of::(); + let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; + compressed.clear(); compressed.resize(max_size, 0); let compressed_size = compress_to_buffer( @@ -20,7 +18,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> ) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; - compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u64).to_le_bytes()); + compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); compressed.resize(compressed_size + count_size, 0); Ok(()) @@ -28,23 +26,22 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> #[inline] pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { - decompressed.clear(); - - let count_size = std::mem::size_of::(); + let count_size = std::mem::size_of::(); + let uncompressed_size = u32::from_le_bytes( + compressed + .get(..count_size) + .ok_or(io::ErrorKind::InvalidData)? + .try_into() + .unwrap(), + ) as usize; - let uncompressed_size_bytes: &[u8; 8] = compressed - .get(..count_size) - .ok_or(io::ErrorKind::InvalidData)? - .try_into() - .unwrap(); - - let uncompressed_size = u64::from_le_bytes(*uncompressed_size_bytes); + decompressed.clear(); + decompressed.resize(uncompressed_size, 0); - decompressed.resize(uncompressed_size as usize, 0); let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed) .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; - if decompressed_size != uncompressed_size as usize { + if decompressed_size != uncompressed_size { return Err(io::Error::new( io::ErrorKind::InvalidData, "doc store block not completely decompressed, data corruption".to_string(), From 6837a4d468379613b212aafdd34d9ca488108b56 Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Wed, 18 May 2022 20:35:29 +1000 Subject: [PATCH 6/8] Fix bench --- src/store/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/store/mod.rs b/src/store/mod.rs index 907cb8637f..1fcec19091 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -360,6 +360,7 @@ mod bench { directory.open_write(path).unwrap(), 1_000, Compressor::default(), + 16_384, ); directory.delete(path).unwrap(); }); @@ -373,6 +374,7 @@ mod bench { directory.open_write(path).unwrap(), 1_000, Compressor::default(), + 16_384, ); let store_file = directory.open_read(path).unwrap(); let store = StoreReader::open(store_file).unwrap(); From fc045e6bf98e4b342fb38bf1461197789f6905ff Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Thu, 19 May 2022 10:34:02 +1000 Subject: [PATCH 7/8] Cleanup imports, remove unneeded error mapping --- src/store/compression_zstd_block.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs index f2ae66cc95..6879cc2fbd 100644 --- a/src/store/compression_zstd_block.rs +++ b/src/store/compression_zstd_block.rs @@ -1,4 +1,4 @@ -use std::io::{self, Read, Write}; +use std::io; use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; use zstd::DEFAULT_COMPRESSION_LEVEL; @@ -15,8 +15,7 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> uncompressed, &mut compressed[count_size..], DEFAULT_COMPRESSION_LEVEL, - ) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + )?; compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); compressed.resize(compressed_size + count_size, 0); @@ -38,8 +37,7 @@ pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<( decompressed.clear(); decompressed.resize(uncompressed_size, 0); - let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed) - .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?; + let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?; if decompressed_size != uncompressed_size { return Err(io::Error::new( From c95013b11eea026b9b85312d6133f67b80758382 Mon Sep 17 00:00:00 2001 From: Kryesh <28243483+kryesh@users.noreply.github.com> Date: Thu, 19 May 2022 22:15:18 +1000 Subject: [PATCH 8/8] Add zstd-compression feature to github workflow tests --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 524986d268..2604f17de1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,7 +33,7 @@ jobs: components: rustfmt, clippy - name: Run tests - run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace + run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace - name: Run tests quickwit feature run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace