Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zstd compression support, Make block size configurable via IndexSettings #1374

Merged
merged 8 commits into from May 25, 2022
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Expand Up @@ -33,7 +33,7 @@ jobs:
components: rustfmt, clippy

- name: Run tests
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,failpoints --verbose --workspace
run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace

- name: Run tests quickwit feature
run: cargo +stable test --features mmap,quickwit,failpoints --verbose --workspace
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -23,6 +23,7 @@ tantivy-fst = "0.3.0"
memmap2 = { version = "0.5.3", optional = true }
lz4_flex = { version = "0.9.2", default-features = false, features = ["checked-decode"], optional = true }
brotli = { version = "3.3.4", optional = true }
zstd = { version = "0.11", optional = true }
snap = { version = "1.0.5", optional = true }
tempfile = { version = "3.3.0", optional = true }
log = "0.4.16"
Expand Down Expand Up @@ -93,6 +94,7 @@ mmap = ["fs2", "tempfile", "memmap2"]
brotli-compression = ["brotli"]
lz4-compression = ["lz4_flex"]
snappy-compression = ["snap"]
zstd-compression = ["zstd"]

failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.
Expand Down
23 changes: 21 additions & 2 deletions src/core/index_meta.rs
Expand Up @@ -239,7 +239,7 @@ impl InnerSegmentMeta {
///
/// Contains settings which are applied on the whole
/// index, like presort documents.
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexSettings {
/// Sorts the documents by information
/// provided in `IndexSortByField`
Expand All @@ -248,7 +248,26 @@ pub struct IndexSettings {
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
}

/// Must be a function to be compatible with serde defaults
fn default_docstore_blocksize() -> usize {
16_384
}

impl Default for IndexSettings {
fn default() -> Self {
Self {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
}
}
}

/// Settings to presort the documents in an index
///
/// Presorting documents can greatly performance
Expand Down Expand Up @@ -401,7 +420,7 @@ mod tests {
let json = serde_json::ser::to_string(&index_metas).expect("serialization failed");
assert_eq!(
json,
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4"},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"lz4","docstore_blocksize":16384},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#
);

let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/segment_serializer.rs
Expand Up @@ -39,9 +39,10 @@ impl SegmentSerializer {

let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor),
store_writer: StoreWriter::new(store_write, compressor, blocksize),
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/segment_writer.rs
Expand Up @@ -372,9 +372,10 @@ fn remap_and_write(
.segment_mut()
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor),
StoreWriter::new(store_write, compressor, block_size),
);
old_store_writer.close()?;
let store_read = StoreReader::open(
Expand Down
50 changes: 50 additions & 0 deletions src/store/compression_zstd_block.rs
@@ -0,0 +1,50 @@
use std::io;

use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;

#[inline]
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size;

compressed.clear();
compressed.resize(max_size, 0);

let compressed_size = compress_to_buffer(
uncompressed,
&mut compressed[count_size..],
DEFAULT_COMPRESSION_LEVEL,
)?;

compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes());
compressed.resize(compressed_size + count_size, 0);

Ok(())
}

#[inline]
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
let count_size = std::mem::size_of::<u32>();
let uncompressed_size = u32::from_le_bytes(
compressed
.get(..count_size)
.ok_or(io::ErrorKind::InvalidData)?
.try_into()
.unwrap(),
) as usize;

decompressed.clear();
decompressed.resize(uncompressed_size, 0);

let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?;

if decompressed_size != uncompressed_size {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"doc store block not completely decompressed, data corruption".to_string(),
));
}

Ok(())
}
27 changes: 27 additions & 0 deletions src/store/compressors.rs
Expand Up @@ -26,6 +26,9 @@ pub enum Compressor {
#[serde(rename = "snappy")]
/// Use the snap compressor
Snappy,
#[serde(rename = "zstd")]
/// Use the zstd compressor
Zstd,
}

impl Default for Compressor {
Expand All @@ -36,6 +39,8 @@ impl Default for Compressor {
Compressor::Brotli
} else if cfg!(feature = "snappy-compression") {
Compressor::Snappy
} else if cfg!(feature = "zstd-compression") {
Compressor::Zstd
} else {
Compressor::None
}
Expand All @@ -49,6 +54,7 @@ impl Compressor {
1 => Compressor::Lz4,
2 => Compressor::Brotli,
3 => Compressor::Snappy,
4 => Compressor::Zstd,
_ => panic!("unknown compressor id {:?}", id),
}
}
Expand All @@ -58,6 +64,7 @@ impl Compressor {
Self::Lz4 => 1,
Self::Brotli => 2,
Self::Snappy => 3,
Self::Zstd => 4,
}
}
#[inline]
Expand Down Expand Up @@ -98,6 +105,16 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::compress(uncompressed, compressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}

Expand Down Expand Up @@ -143,6 +160,16 @@ impl Compressor {
panic!("snappy-compression feature flag not activated");
}
}
Self::Zstd => {
#[cfg(feature = "zstd-compression")]
{
super::compression_zstd_block::decompress(compressed, decompressed)
}
#[cfg(not(feature = "zstd-compression"))]
{
panic!("zstd-compression feature flag not activated");
}
}
}
}
}
30 changes: 22 additions & 8 deletions src/store/mod.rs
Expand Up @@ -50,6 +50,9 @@ mod compression_brotli;
#[cfg(feature = "snappy-compression")]
mod compression_snap;

#[cfg(feature = "zstd-compression")]
mod compression_zstd_block;

#[cfg(test)]
pub mod tests {

Expand All @@ -69,18 +72,21 @@ pub mod tests {
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \
mollit anim id est laborum.";

const BLOCK_SIZE: usize = 16_384;

pub fn write_lorem_ipsum_store(
writer: WritePtr,
num_docs: usize,
compressor: Compressor,
blocksize: usize,
) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
let field_title =
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand All @@ -103,7 +109,7 @@ pub mod tests {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
Expand Down Expand Up @@ -139,11 +145,11 @@ pub mod tests {
Ok(())
}

fn test_store(compressor: Compressor) -> crate::Result<()> {
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test in our test_store suite that test for random payloads?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test in our test_store suite that test for random (high entropy) payloads?

We spotted a bug in the original form of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into it later today - but just wanted to confirm, would the test be for the case where the input is incompressible to the point where the compressed output is larger than the input?
Just making sure the test is for the right thing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor);
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
Expand All @@ -169,22 +175,28 @@ pub mod tests {

#[test]
fn test_store_noop() -> crate::Result<()> {
test_store(Compressor::None)
test_store(Compressor::None, BLOCK_SIZE)
}
#[cfg(feature = "lz4-compression")]
#[test]
fn test_store_lz4_block() -> crate::Result<()> {
test_store(Compressor::Lz4)
test_store(Compressor::Lz4, BLOCK_SIZE)
}
#[cfg(feature = "snappy-compression")]
#[test]
fn test_store_snap() -> crate::Result<()> {
test_store(Compressor::Snappy)
test_store(Compressor::Snappy, BLOCK_SIZE)
}
#[cfg(feature = "brotli-compression")]
#[test]
fn test_store_brotli() -> crate::Result<()> {
test_store(Compressor::Brotli)
test_store(Compressor::Brotli, BLOCK_SIZE)
}

#[cfg(feature = "zstd-compression")]
#[test]
fn test_store_zstd() -> crate::Result<()> {
test_store(Compressor::Zstd, BLOCK_SIZE)
}

#[test]
Expand Down Expand Up @@ -348,6 +360,7 @@ mod bench {
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
16_384,
);
directory.delete(path).unwrap();
});
Expand All @@ -361,6 +374,7 @@ mod bench {
directory.open_write(path).unwrap(),
1_000,
Compressor::default(),
16_384,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file).unwrap();
Expand Down
4 changes: 3 additions & 1 deletion src/store/reader.rs
Expand Up @@ -304,6 +304,8 @@ mod tests {
use crate::store::tests::write_lorem_ipsum_store;
use crate::Directory;

const BLOCK_SIZE: usize = 16_384;

fn get_text_field<'a>(doc: &'a Document, field: &'a Field) -> Option<&'a str> {
doc.get_first(*field).and_then(|f| f.as_text())
}
Expand All @@ -313,7 +315,7 @@ mod tests {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default());
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
Expand Down
10 changes: 5 additions & 5 deletions src/store/writer.rs
Expand Up @@ -11,8 +11,6 @@ use crate::schema::Document;
use crate::store::index::Checkpoint;
use crate::DocId;

const BLOCK_SIZE: usize = 16_384;

/// Write tantivy's [`Store`](./index.html)
///
/// Contrary to the other components of `tantivy`,
Expand All @@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384;
/// The skip list index on the other hand, is built in memory.
pub struct StoreWriter {
compressor: Compressor,
block_size: usize,
doc: DocId,
first_doc_in_block: DocId,
offset_index_writer: SkipIndexBuilder,
Expand All @@ -35,9 +34,10 @@ impl StoreWriter {
///
/// The store writer will writes blocks on disc as
/// document are added.
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter {
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter {
StoreWriter {
compressor,
block_size,
doc: 0,
first_doc_in_block: 0,
offset_index_writer: SkipIndexBuilder::new(),
Expand Down Expand Up @@ -65,7 +65,7 @@ impl StoreWriter {
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true for the other stores, and it does not matter much, but shouldn't we make that >=?.

This is almost philosophy at this point, but my mental model is
"We close a block once it is full". "Being full means being greater or equal to BLOCK_SIZE"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true for the other stores, and it does not matter much, but shouldn't we make that >=?.

This is almost philosophy at this point, but my mental model is
"We close a block once it is full". "Being full means being greater or equal to BLOCK_SIZE"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just went with what was already there, personally I don't think it makes much difference since blocks are allowed to overflow anyway;
Happy to go either way on this one

if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
Ok(())
Expand All @@ -86,7 +86,7 @@ impl StoreWriter {
self.current_block
.write_all(&self.intermediary_buffer[..])?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
if self.current_block.len() > self.block_size {
self.write_and_compress_block()?;
}
Ok(())
Expand Down