diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 524986d268..2604f17de1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -33,7 +33,7 @@ jobs: components: rustfmt, clippy - name: Run tests - run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace + run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace - name: Run tests quickwit feature run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace diff --git a/Cargo.toml b/Cargo.toml index 4699343d33..bec5617d8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ tantivy-fst = "0.3.0" memmap2 = { version = "0.5.3", optional = true } lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true } brotli = { version = "3.3.4", optional = true } +zstd = { version = "0.11", optional = true } snap = { version = "1.0.5", optional = true } tempfile = { version = "3.3.0", optional = true } log = "0.4.16" @@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"] brotli-compression = ["brotli"] lz4-compression = ["lz4_flex"] snappy-compression = ["snap"] +zstd-compression = ["zstd"] failpoints = ["fail/failpoints"] unstable = [] # useful for benches. diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 8291cf0322..1343c8d51b 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -239,7 +239,7 @@ impl InnerSegmentMeta { /// /// Contains settings which are applied on the whole /// index, like presort documents. -#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] pub struct IndexSettings { /// Sorts the documents by information /// provided in `IndexSortByField` @@ -248,7 +248,26 @@ pub struct IndexSettings { /// The `Compressor` used to compress the doc store. #[serde(default)] pub docstore_compression: Compressor, + #[serde(default = "default_docstore_blocksize")] + /// The size of each block that will be compressed and written to disk + pub docstore_blocksize: usize, +} + +/// Must be a function to be compatible with serde defaults +fn default_docstore_blocksize() -> usize { + 16_384 +} + +impl Default for IndexSettings { + fn default() -> Self { + Self { + sort_by_field: None, + docstore_compression: Compressor::default(), + docstore_blocksize: default_docstore_blocksize(), + } + } } + /// Settings to presort the documents in an index /// /// Presorting documents can greatly performance @@ -401,7 +420,7 @@ mod tests { let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); assert_eq!( json, - r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"# + r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"# ); let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap(); diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 923b5b6dd1..554503e668 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -39,9 +39,10 @@ impl SegmentSerializer { let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; let compressor = segment.index().settings().docstore_compression; + let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor), + store_writer: StoreWriter::new(store_write, compressor, blocksize), fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 537b4f502a..c1ae1c6e88 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -372,9 +372,10 @@ fn remap_and_write( .segment_mut() .open_write(SegmentComponent::Store)?; let compressor = serializer.segment().index().settings().docstore_compression; + let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor), + StoreWriter::new(store_write, compressor, block_size), ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/compression_zstd_block.rs b/src/store/compression_zstd_block.rs new file mode 100644 index 0000000000..6879cc2fbd --- /dev/null +++ b/src/store/compression_zstd_block.rs @@ -0,0 +1,50 @@ +use std::io; + +use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; +use zstd::DEFAULT_COMPRESSION_LEVEL; + +#[inline] +pub fn compress(uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()> { + let count_size = std::mem::size_of::(); + let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; + + compressed.clear(); + compressed.resize(max_size, 0); + + let compressed_size = compress_to_buffer( + uncompressed, + &mut compressed[count_size..], + DEFAULT_COMPRESSION_LEVEL, + )?; + + compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); + compressed.resize(compressed_size + count_size, 0); + + Ok(()) +} + +#[inline] +pub fn decompress(compressed: &[u8], decompressed: &mut Vec) -> io::Result<()> { + let count_size = std::mem::size_of::(); + let uncompressed_size = u32::from_le_bytes( + compressed + .get(..count_size) + .ok_or(io::ErrorKind::InvalidData)? + .try_into() + .unwrap(), + ) as usize; + + decompressed.clear(); + decompressed.resize(uncompressed_size, 0); + + let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?; + + if decompressed_size != uncompressed_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "doc store block not completely decompressed, data corruption".to_string(), + )); + } + + Ok(()) +} diff --git a/src/store/compressors.rs b/src/store/compressors.rs index f386d286e8..2e3ae5293e 100644 --- a/src/store/compressors.rs +++ b/src/store/compressors.rs @@ -26,6 +26,9 @@ pub enum Compressor { #[serde(rename = "snappy")] /// Use the snap compressor Snappy, + #[serde(rename = "zstd")] + /// Use the zstd compressor + Zstd, } impl Default for Compressor { @@ -36,6 +39,8 @@ impl Default for Compressor { Compressor::Brotli } else if cfg!(feature = "snappy-compression") { Compressor::Snappy + } else if cfg!(feature = "zstd-compression") { + Compressor::Zstd } else { Compressor::None } @@ -49,6 +54,7 @@ impl Compressor { 1 => Compressor::Lz4, 2 => Compressor::Brotli, 3 => Compressor::Snappy, + 4 => Compressor::Zstd, _ => panic!("unknown compressor id {:?}", id), } } @@ -58,6 +64,7 @@ impl Compressor { Self::Lz4 => 1, Self::Brotli => 2, Self::Snappy => 3, + Self::Zstd => 4, } } #[inline] @@ -98,6 +105,16 @@ impl Compressor { panic!("snappy-compression feature flag not activated"); } } + Self::Zstd => { + #[cfg(feature = "zstd-compression")] + { + super::compression_zstd_block::compress(uncompressed, compressed) + } + #[cfg(not(feature = "zstd-compression"))] + { + panic!("zstd-compression feature flag not activated"); + } + } } } @@ -143,6 +160,16 @@ impl Compressor { panic!("snappy-compression feature flag not activated"); } } + Self::Zstd => { + #[cfg(feature = "zstd-compression")] + { + super::compression_zstd_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "zstd-compression"))] + { + panic!("zstd-compression feature flag not activated"); + } + } } } } diff --git a/src/store/mod.rs b/src/store/mod.rs index b378d1db5d..1fcec19091 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -50,6 +50,9 @@ mod compression_brotli; #[cfg(feature = "snappy-compression")] mod compression_snap; +#[cfg(feature = "zstd-compression")] +mod compression_zstd_block; + #[cfg(test)] pub mod tests { @@ -69,10 +72,13 @@ pub mod tests { sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \ mollit anim id est laborum."; + const BLOCK_SIZE: usize = 16_384; + pub fn write_lorem_ipsum_store( writer: WritePtr, num_docs: usize, compressor: Compressor, + blocksize: usize, ) -> Schema { let mut schema_builder = Schema::builder(); let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); @@ -80,7 +86,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); @@ -103,7 +109,7 @@ pub mod tests { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4); + let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; @@ -139,11 +145,11 @@ pub mod tests { Ok(()) } - fn test_store(compressor: Compressor) -> crate::Result<()> { + fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor); + let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; @@ -169,22 +175,28 @@ pub mod tests { #[test] fn test_store_noop() -> crate::Result<()> { - test_store(Compressor::None) + test_store(Compressor::None, BLOCK_SIZE) } #[cfg(feature = "lz4-compression")] #[test] fn test_store_lz4_block() -> crate::Result<()> { - test_store(Compressor::Lz4) + test_store(Compressor::Lz4, BLOCK_SIZE) } #[cfg(feature = "snappy-compression")] #[test] fn test_store_snap() -> crate::Result<()> { - test_store(Compressor::Snappy) + test_store(Compressor::Snappy, BLOCK_SIZE) } #[cfg(feature = "brotli-compression")] #[test] fn test_store_brotli() -> crate::Result<()> { - test_store(Compressor::Brotli) + test_store(Compressor::Brotli, BLOCK_SIZE) + } + + #[cfg(feature = "zstd-compression")] + #[test] + fn test_store_zstd() -> crate::Result<()> { + test_store(Compressor::Zstd, BLOCK_SIZE) } #[test] @@ -348,6 +360,7 @@ mod bench { directory.open_write(path).unwrap(), 1_000, Compressor::default(), + 16_384, ); directory.delete(path).unwrap(); }); @@ -361,6 +374,7 @@ mod bench { directory.open_write(path).unwrap(), 1_000, Compressor::default(), + 16_384, ); let store_file = directory.open_read(path).unwrap(); let store = StoreReader::open(store_file).unwrap(); diff --git a/src/store/reader.rs b/src/store/reader.rs index f3dadda2a4..830396b083 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -304,6 +304,8 @@ mod tests { use crate::store::tests::write_lorem_ipsum_store; use crate::Directory; + const BLOCK_SIZE: usize = 16_384; + fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> { doc.get_first(*field).and_then(|f| f.as_text()) } @@ -313,7 +315,7 @@ mod tests { let directory = RamDirectory::create(); let path = Path::new("store"); let writer = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(writer, 500, Compressor::default()); + let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file)?; diff --git a/src/store/writer.rs b/src/store/writer.rs index 0efdb0fc61..96864c75bb 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -11,8 +11,6 @@ use crate::schema::Document; use crate::store::index::Checkpoint; use crate::DocId; -const BLOCK_SIZE: usize = 16_384; - /// Write tantivy's [`Store`](./index.html) /// /// Contrary to the other components of `tantivy`, @@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384; /// The skip list index on the other hand, is built in memory. pub struct StoreWriter { compressor: Compressor, + block_size: usize, doc: DocId, first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, @@ -35,9 +34,10 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter { + pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { StoreWriter { compressor, + block_size, doc: 0, first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), @@ -65,7 +65,7 @@ impl StoreWriter { VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block.write_all(serialized_document)?; self.doc += 1; - if self.current_block.len() > BLOCK_SIZE { + if self.current_block.len() > self.block_size { self.write_and_compress_block()?; } Ok(()) @@ -86,7 +86,7 @@ impl StoreWriter { self.current_block .write_all(&self.intermediary_buffer[..])?; self.doc += 1; - if self.current_block.len() > BLOCK_SIZE { + if self.current_block.len() > self.block_size { self.write_and_compress_block()?; } Ok(())