Skip to content

Commit

Permalink
Allow for a same-thread doc compressor.
Browse files Browse the repository at this point in the history
In addition, it isolates the doc compressor logic,
better reports io::Result.

In the case of the same-thread doc compressor,
the blocks are also not copied.
  • Loading branch information
fulmicoton committed Sep 8, 2022
1 parent 4d634d6 commit 22b5d11
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 163 deletions.
61 changes: 60 additions & 1 deletion src/core/index_meta.rs
Expand Up @@ -235,6 +235,14 @@ impl InnerSegmentMeta {
}
}

fn return_true() -> bool {
true
}

fn is_true(val: &bool) -> bool {
*val
}

/// Search Index Settings.
///
/// Contains settings which are applied on the whole
Expand All @@ -248,6 +256,12 @@ pub struct IndexSettings {
/// The `Compressor` used to compress the doc store.
#[serde(default)]
pub docstore_compression: Compressor,
/// If set to true, docstore compression will happen on a dedicated thread.
/// (defaults: true)
#[doc(hidden)]
#[serde(default = "return_true")]
#[serde(skip_serializing_if = "is_true")]
pub docstore_compress_dedicated_thread: bool,
#[serde(default = "default_docstore_blocksize")]
/// The size of each block that will be compressed and written to disk
pub docstore_blocksize: usize,
Expand All @@ -264,6 +278,7 @@ impl Default for IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_blocksize: default_docstore_blocksize(),
docstore_compress_dedicated_thread: true,
}
}
}
Expand Down Expand Up @@ -395,7 +410,7 @@ mod tests {
use super::IndexMeta;
use crate::core::index_meta::UntrackedIndexMeta;
use crate::schema::{Schema, TEXT};
use crate::store::ZstdCompressor;
use crate::store::{Compressor, ZstdCompressor};
use crate::{IndexSettings, IndexSortByField, Order};

#[test]
Expand Down Expand Up @@ -447,6 +462,7 @@ mod tests {
compression_level: Some(4),
}),
docstore_blocksize: 1_000_000,
docstore_compress_dedicated_thread: true,
},
segments: Vec::new(),
schema,
Expand Down Expand Up @@ -485,4 +501,47 @@ mod tests {
"unknown zstd option \"bla\" at line 1 column 103".to_string()
);
}

#[test]
#[cfg(feature = "lz4-compression")]
fn test_index_settings_default() {
let mut index_settings = IndexSettings::default();
assert_eq!(
index_settings,
IndexSettings {
sort_by_field: None,
docstore_compression: Compressor::default(),
docstore_compress_dedicated_thread: true,
docstore_blocksize: 16_384
}
);
{
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
assert_eq!(
index_settings_json,
serde_json::json!({
"docstore_compression": "lz4",
"docstore_blocksize": 16384
})
);
let index_settings_deser: IndexSettings =
serde_json::from_value(index_settings_json).unwrap();
assert_eq!(index_settings_deser, index_settings);
}
{
index_settings.docstore_compress_dedicated_thread = false;
let index_settings_json = serde_json::to_value(&index_settings).unwrap();
assert_eq!(
index_settings_json,
serde_json::json!({
"docstore_compression": "lz4",
"docstore_blocksize": 16384,
"docstore_compress_dedicated_thread": false,
})
);
let index_settings_deser: IndexSettings =
serde_json::from_value(index_settings_json).unwrap();
assert_eq!(index_settings_deser, index_settings);
}
}
}
11 changes: 8 additions & 3 deletions src/indexer/segment_serializer.rs
Expand Up @@ -38,11 +38,16 @@ impl SegmentSerializer {
let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?;

let postings_serializer = InvertedIndexSerializer::open(&mut segment)?;
let compressor = segment.index().settings().docstore_compression;
let blocksize = segment.index().settings().docstore_blocksize;
let settings = segment.index().settings();
let store_writer = StoreWriter::new(
store_write,
settings.docstore_compression,
settings.docstore_blocksize,
settings.docstore_compress_dedicated_thread,
)?;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
store_writer,
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
14 changes: 8 additions & 6 deletions src/indexer/segment_writer.rs
Expand Up @@ -380,12 +380,14 @@ fn remap_and_write(
let store_write = serializer
.segment_mut()
.open_write(SegmentComponent::Store)?;
let compressor = serializer.segment().index().settings().docstore_compression;
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size)?,
);
let settings = serializer.segment().index().settings();
let store_writer = StoreWriter::new(
store_write,
settings.docstore_compression,
settings.docstore_blocksize,
settings.docstore_compress_dedicated_thread,
)?;
let old_store_writer = std::mem::replace(&mut serializer.store_writer, store_writer);
old_store_writer.close()?;
let store_read = StoreReader::open(
serializer
Expand Down
41 changes: 31 additions & 10 deletions src/store/mod.rs
Expand Up @@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;
mod store_compressor;

#[cfg(feature = "lz4-compression")]
mod compression_lz4_block;
Expand Down Expand Up @@ -82,14 +83,16 @@ pub mod tests {
num_docs: usize,
compressor: Compressor,
blocksize: usize,
separate_thread: bool,
) -> Schema {
let mut schema_builder = Schema::builder();
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored());
let field_title =
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
let mut store_writer =
StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap();
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand All @@ -112,7 +115,8 @@ pub mod tests {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let schema =
write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, 10)?;
Expand Down Expand Up @@ -148,11 +152,16 @@ pub mod tests {
Ok(())
}

fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
fn test_store(
compressor: Compressor,
blocksize: usize,
separate_thread: bool,
) -> crate::Result<()> {
let path = Path::new("store");
let directory = RamDirectory::create();
let store_wrt = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let schema =
write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, 10)?;
Expand All @@ -177,29 +186,39 @@ pub mod tests {
}

#[test]
fn test_store_noop() -> crate::Result<()> {
test_store(Compressor::None, BLOCK_SIZE)
fn test_store_no_compression_same_thread() -> crate::Result<()> {
test_store(Compressor::None, BLOCK_SIZE, false)
}

#[test]
fn test_store_no_compression() -> crate::Result<()> {
test_store(Compressor::None, BLOCK_SIZE, true)
}

#[cfg(feature = "lz4-compression")]
#[test]
fn test_store_lz4_block() -> crate::Result<()> {
test_store(Compressor::Lz4, BLOCK_SIZE)
test_store(Compressor::Lz4, BLOCK_SIZE, true)
}
#[cfg(feature = "snappy-compression")]
#[test]
fn test_store_snap() -> crate::Result<()> {
test_store(Compressor::Snappy, BLOCK_SIZE)
test_store(Compressor::Snappy, BLOCK_SIZE, true)
}
#[cfg(feature = "brotli-compression")]
#[test]
fn test_store_brotli() -> crate::Result<()> {
test_store(Compressor::Brotli, BLOCK_SIZE)
test_store(Compressor::Brotli, BLOCK_SIZE, true)
}

#[cfg(feature = "zstd-compression")]
#[test]
fn test_store_zstd() -> crate::Result<()> {
test_store(Compressor::Zstd(ZstdCompressor::default()), BLOCK_SIZE)
test_store(
Compressor::Zstd(ZstdCompressor::default()),
BLOCK_SIZE,
true,
)
}

#[test]
Expand Down Expand Up @@ -364,6 +383,7 @@ mod bench {
1_000,
Compressor::default(),
16_384,
true,
);
directory.delete(path).unwrap();
});
Expand All @@ -378,6 +398,7 @@ mod bench {
1_000,
Compressor::default(),
16_384,
true,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file, 10).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/store/reader.rs
Expand Up @@ -393,7 +393,7 @@ mod tests {
let directory = RamDirectory::create();
let path = Path::new("store");
let writer = directory.open_write(path)?;
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;
Expand Down

0 comments on commit 22b5d11

Please sign in to comment.