From 0b1196ffaa6aae94b771dd43f4b2086f7490ec42 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 2 Jun 2022 22:43:01 +0800 Subject: [PATCH] split into compressor/decompressor use custom de/serializer for compressor accept parameters like zstd(compression_level=5) as compressor --- src/core/index_meta.rs | 65 +++++++- src/indexer/merger.rs | 2 +- src/indexer/segment_serializer.rs | 3 +- src/indexer/segment_writer.rs | 7 +- src/store/compressors.rs | 242 +++++++++++++++++++----------- src/store/decompressors.rs | 140 +++++++++++++++++ src/store/footer.rs | 15 +- src/store/mod.rs | 6 +- src/store/reader.rs | 13 +- src/store/writer.rs | 20 +-- 10 files changed, 380 insertions(+), 133 deletions(-) create mode 100644 src/store/decompressors.rs diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 1f2f66eb07..8cd9429302 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -251,11 +251,6 @@ pub struct IndexSettings { #[serde(default = "default_docstore_blocksize")] /// The size of each block that will be compressed and written to disk pub docstore_blocksize: usize, - #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - /// The compression level, which will be forwarded to the underlying compressor (if - /// applicaple). - pub docstore_compression_level: Option, } /// Must be a function to be compatible with serde defaults @@ -269,7 +264,6 @@ impl Default for IndexSettings { sort_by_field: None, docstore_compression: Compressor::default(), docstore_blocksize: default_docstore_blocksize(), - docstore_compression_level: None, } } } @@ -332,7 +326,7 @@ pub struct IndexMeta { pub payload: Option, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug)] struct UntrackedIndexMeta { pub segments: Vec, #[serde(default)] @@ -401,6 +395,7 @@ mod tests { use super::IndexMeta; use crate::core::index_meta::UntrackedIndexMeta; use crate::schema::{Schema, TEXT}; + use crate::store::ZstdCompressor; use crate::{IndexSettings, IndexSortByField, Order}; #[test] @@ -434,4 +429,60 @@ mod tests { assert_eq!(index_metas.schema, deser_meta.schema); assert_eq!(index_metas.opstamp, deser_meta.opstamp); } + + #[test] + fn test_serialize_metas_zstd_compressor() { + let schema = { + let mut schema_builder = Schema::builder(); + schema_builder.add_text_field("text", TEXT); + schema_builder.build() + }; + let index_metas = IndexMeta { + index_settings: IndexSettings { + sort_by_field: Some(IndexSortByField { + field: "text".to_string(), + order: Order::Asc, + }), + docstore_compression: crate::store::Compressor::Zstd(ZstdCompressor { + compression_level: Some(4), + }), + docstore_blocksize: 1_000_000, + }, + segments: Vec::new(), + schema, + opstamp: 0u64, + payload: None, + }; + let json = serde_json::ser::to_string(&index_metas).expect("serialization failed"); + assert_eq!( + json, + r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(compression_level=4)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"# + ); + + let deser_meta: UntrackedIndexMeta = serde_json::from_str(&json).unwrap(); + assert_eq!(index_metas.index_settings, deser_meta.index_settings); + assert_eq!(index_metas.schema, deser_meta.schema); + assert_eq!(index_metas.opstamp, deser_meta.opstamp); + } + + #[test] + fn test_serialize_metas_invalid_comp() { + let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#; + + let err = serde_json::from_str::(&json).unwrap_err(); + assert_eq!( + err.to_string(), + "unknown variant `zsstd`, expected one of `none`, `lz4`, `brotli`, `snappy`, `zstd`, \ + `zstd(compression_level=5)` at line 1 column 96" + .to_string() + ); + + let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#; + + let err = serde_json::from_str::(&json).unwrap_err(); + assert_eq!( + err.to_string(), + "unknown zstd option \"bla\" at line 1 column 103".to_string() + ); + } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 9856bc134f..455ab7b693 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1073,7 +1073,7 @@ impl IndexMerger { // // take 7 in order to not walk over all checkpoints. || store_reader.block_checkpoints().take(7).count() < 6 - || store_reader.compressor() != store_writer.compressor() + || store_reader.decompressor() != store_writer.compressor().into() { for doc_bytes_res in store_reader.iter_raw(reader.alive_bitset()) { let doc_bytes = doc_bytes_res?; diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 891fdcaa32..554503e668 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -40,10 +40,9 @@ impl SegmentSerializer { let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; let compressor = segment.index().settings().docstore_compression; let blocksize = segment.index().settings().docstore_blocksize; - let compression_level = segment.index().settings().docstore_compression_level; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize, compression_level), + store_writer: StoreWriter::new(store_write, compressor, blocksize), fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index dac596df94..c1ae1c6e88 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -373,14 +373,9 @@ fn remap_and_write( .open_write(SegmentComponent::Store)?; let compressor = serializer.segment().index().settings().docstore_compression; let block_size = serializer.segment().index().settings().docstore_blocksize; - let compression_level = serializer - .segment() - .index() - .settings() - .docstore_compression_level; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size, compression_level), + StoreWriter::new(store_write, compressor, block_size), ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/compressors.rs b/src/store/compressors.rs index e50ff2f927..ea2478015a 100644 --- a/src/store/compressors.rs +++ b/src/store/compressors.rs @@ -1,6 +1,6 @@ use std::io; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; pub trait StoreCompressor { fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()>; @@ -12,23 +12,118 @@ pub trait StoreCompressor { /// the compressor used to compress the doc store. /// /// The default is Lz4Block, but also depends on the enabled feature flags. -#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, Copy, PartialEq, Eq)] pub enum Compressor { - #[serde(rename = "none")] /// No compression None, - #[serde(rename = "lz4")] /// Use the lz4 compressor (block format) Lz4, - #[serde(rename = "brotli")] /// Use the brotli compressor Brotli, - #[serde(rename = "snappy")] /// Use the snap compressor Snappy, - #[serde(rename = "zstd")] /// Use the zstd compressor - Zstd, + Zstd(ZstdCompressor), +} + +impl Serialize for Compressor { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match *self { + Compressor::None => serializer.serialize_str("none"), + Compressor::Lz4 => serializer.serialize_str("lz4"), + Compressor::Brotli => serializer.serialize_str("brotli"), + Compressor::Snappy => serializer.serialize_str("snappy"), + Compressor::Zstd(zstd) => serializer.serialize_str(&zstd.ser_to_string()), + } + } +} + +impl<'de> Deserialize<'de> for Compressor { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let buf = String::deserialize(deserializer)?; + let compressor = match buf.as_str() { + "none" => Compressor::None, + "lz4" => Compressor::Lz4, + "brotli" => Compressor::Brotli, + "snappy" => Compressor::Snappy, + _ => { + if buf.starts_with("zstd") { + Compressor::Zstd( + ZstdCompressor::deser_from_str(&buf).map_err(serde::de::Error::custom)?, + ) + } else { + return Err(serde::de::Error::unknown_variant( + &buf, + &[ + "none", + "lz4", + "brotli", + "snappy", + "zstd", + "zstd(compression_level=5)", + ], + )); + } + } + }; + + Ok(compressor) + } +} + +#[derive(Clone, Default, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +/// The Zstd compressor, with optional compression level. +pub struct ZstdCompressor { + /// The compression level, if unset defaults to zstd::DEFAULT_COMPRESSION_LEVEL = 3 + pub compression_level: Option, +} + +impl ZstdCompressor { + fn deser_from_str(val: &str) -> Result { + if !val.starts_with("zstd") { + return Err(format!("needs to start with zstd, but got {}", val)); + } + if val == "zstd" { + return Ok(ZstdCompressor::default()); + } + let options = &val["zstd".len() + 1..val.len() - 1]; + + let mut compressor = ZstdCompressor::default(); + for option in options.split(',') { + let (opt_name, value) = options + .split_once('=') + .ok_or_else(|| format!("no '=' found in option {:?}", option))?; + + match opt_name { + "compression_level" => { + let value = value.parse::().map_err(|err| { + format!( + "Could not parse value {} of option {}, e: {}", + value, opt_name, err + ) + })?; + compressor.compression_level = Some(value); + } + _ => { + return Err(format!("unknown zstd option {:?}", opt_name)); + } + } + } + Ok(compressor) + } + fn ser_to_string(&self) -> String { + if let Some(compression_level) = self.compression_level { + format!("zstd(compression_level={})", compression_level) + } else { + "zstd".to_string() + } + } } impl Default for Compressor { @@ -40,7 +135,7 @@ impl Default for Compressor { } else if cfg!(feature = "snappy-compression") { Compressor::Snappy } else if cfg!(feature = "zstd-compression") { - Compressor::Zstd + Compressor::Zstd(ZstdCompressor::default()) } else { Compressor::None } @@ -48,31 +143,11 @@ impl Default for Compressor { } impl Compressor { - pub(crate) fn from_id(id: u8) -> Compressor { - match id { - 0 => Compressor::None, - 1 => Compressor::Lz4, - 2 => Compressor::Brotli, - 3 => Compressor::Snappy, - 4 => Compressor::Zstd, - _ => panic!("unknown compressor id {:?}", id), - } - } - pub(crate) fn get_id(&self) -> u8 { - match self { - Self::None => 0, - Self::Lz4 => 1, - Self::Brotli => 2, - Self::Snappy => 3, - Self::Zstd => 4, - } - } #[inline] - pub(crate) fn compress( + pub(crate) fn compress_into( &self, uncompressed: &[u8], compressed: &mut Vec, - _compression_level: Option, ) -> io::Result<()> { match self { Self::None => { @@ -110,13 +185,13 @@ impl Compressor { panic!("snappy-compression feature flag not activated"); } } - Self::Zstd => { + Self::Zstd(_zstd_compressor) => { #[cfg(feature = "zstd-compression")] { super::compression_zstd_block::compress( uncompressed, compressed, - _compression_level, + _zstd_compressor.compression_level, ) } #[cfg(not(feature = "zstd-compression"))] @@ -126,65 +201,56 @@ impl Compressor { } } } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn zstd_serde_roundtrip() { + let compressor = ZstdCompressor { + compression_level: Some(15), + }; - pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result> { - let mut decompressed_block = vec![]; - self.decompress_into(compressed_block, &mut decompressed_block)?; - Ok(decompressed_block) + assert_eq!( + ZstdCompressor::deser_from_str(&compressor.ser_to_string()).unwrap(), + compressor + ); + + assert_eq!( + ZstdCompressor::deser_from_str(&ZstdCompressor::default().ser_to_string()).unwrap(), + ZstdCompressor::default() + ); } - #[inline] - pub(crate) fn decompress_into( - &self, - compressed: &[u8], - decompressed: &mut Vec, - ) -> io::Result<()> { - match self { - Self::None => { - decompressed.clear(); - decompressed.extend_from_slice(compressed); - Ok(()) - } - Self::Lz4 => { - #[cfg(feature = "lz4-compression")] - { - super::compression_lz4_block::decompress(compressed, decompressed) - } - #[cfg(not(feature = "lz4-compression"))] - { - panic!("lz4-compression feature flag not activated"); - } - } - Self::Brotli => { - #[cfg(feature = "brotli-compression")] - { - super::compression_brotli::decompress(compressed, decompressed) - } - #[cfg(not(feature = "brotli-compression"))] - { - panic!("brotli-compression feature flag not activated"); - } - } - Self::Snappy => { - #[cfg(feature = "snappy-compression")] - { - super::compression_snap::decompress(compressed, decompressed) - } - #[cfg(not(feature = "snappy-compression"))] - { - panic!("snappy-compression feature flag not activated"); - } - } - Self::Zstd => { - #[cfg(feature = "zstd-compression")] - { - super::compression_zstd_block::decompress(compressed, decompressed) - } - #[cfg(not(feature = "zstd-compression"))] - { - panic!("zstd-compression feature flag not activated"); - } + #[test] + fn deser_zstd_test() { + assert_eq!( + ZstdCompressor::deser_from_str("zstd").unwrap(), + ZstdCompressor::default() + ); + + assert!(ZstdCompressor::deser_from_str("zzstd").is_err()); + assert!(ZstdCompressor::deser_from_str("zzstd()").is_err()); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compression_level=15)").unwrap(), + ZstdCompressor { + compression_level: Some(15) } - } + ); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compresion_level=15)").unwrap_err(), + "unknown zstd option \"compresion_level\"" + ); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compression_level->2)").unwrap_err(), + "no '=' found in option \"compression_level->2\"" + ); + assert_eq!( + ZstdCompressor::deser_from_str("zstd(compression_level=over9000)").unwrap_err(), + "Could not parse value over9000 of option compression_level, e: invalid digit found \ + in string" + ); } } diff --git a/src/store/decompressors.rs b/src/store/decompressors.rs new file mode 100644 index 0000000000..a333a5b88f --- /dev/null +++ b/src/store/decompressors.rs @@ -0,0 +1,140 @@ +use std::io; + +use serde::{Deserialize, Serialize}; + +use super::Compressor; + +pub trait StoreCompressor { + fn compress(&self, uncompressed: &[u8], compressed: &mut Vec) -> io::Result<()>; + fn decompress(&self, compressed: &[u8], decompressed: &mut Vec) -> io::Result<()>; + fn get_compressor_id() -> u8; +} + +/// Decompressor is deserialized from the doc store footer, when opening an index. +#[derive(Clone, Debug, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum Decompressor { + /// No compression + None, + /// Use the lz4 compressor (block format) + Lz4, + /// Use the brotli compressor + Brotli, + /// Use the snap compressor + Snappy, + /// Use the zstd compressor + Zstd, +} + +impl From for Decompressor { + fn from(compressor: Compressor) -> Self { + match compressor { + Compressor::None => Decompressor::None, + Compressor::Lz4 => Decompressor::Lz4, + Compressor::Brotli => Decompressor::Brotli, + Compressor::Snappy => Decompressor::Snappy, + Compressor::Zstd(_) => Decompressor::Zstd, + } + } +} + +impl Decompressor { + pub(crate) fn from_id(id: u8) -> Decompressor { + match id { + 0 => Decompressor::None, + 1 => Decompressor::Lz4, + 2 => Decompressor::Brotli, + 3 => Decompressor::Snappy, + 4 => Decompressor::Zstd, + _ => panic!("unknown compressor id {:?}", id), + } + } + + pub(crate) fn get_id(&self) -> u8 { + match self { + Self::None => 0, + Self::Lz4 => 1, + Self::Brotli => 2, + Self::Snappy => 3, + Self::Zstd => 4, + } + } + + pub(crate) fn decompress(&self, compressed_block: &[u8]) -> io::Result> { + let mut decompressed_block = vec![]; + self.decompress_into(compressed_block, &mut decompressed_block)?; + Ok(decompressed_block) + } + + #[inline] + pub(crate) fn decompress_into( + &self, + compressed: &[u8], + decompressed: &mut Vec, + ) -> io::Result<()> { + match self { + Self::None => { + decompressed.clear(); + decompressed.extend_from_slice(compressed); + Ok(()) + } + Self::Lz4 => { + #[cfg(feature = "lz4-compression")] + { + super::compression_lz4_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "lz4-compression"))] + { + panic!("lz4-compression feature flag not activated"); + } + } + Self::Brotli => { + #[cfg(feature = "brotli-compression")] + { + super::compression_brotli::decompress(compressed, decompressed) + } + #[cfg(not(feature = "brotli-compression"))] + { + panic!("brotli-compression feature flag not activated"); + } + } + Self::Snappy => { + #[cfg(feature = "snappy-compression")] + { + super::compression_snap::decompress(compressed, decompressed) + } + #[cfg(not(feature = "snappy-compression"))] + { + panic!("snappy-compression feature flag not activated"); + } + } + Self::Zstd => { + #[cfg(feature = "zstd-compression")] + { + super::compression_zstd_block::decompress(compressed, decompressed) + } + #[cfg(not(feature = "zstd-compression"))] + { + panic!("zstd-compression feature flag not activated"); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::Compressor; + + #[test] + fn compressor_decompressor_id_test() { + assert_eq!(Decompressor::from(Compressor::None), Decompressor::None); + assert_eq!(Decompressor::from(Compressor::Lz4), Decompressor::Lz4); + assert_eq!(Decompressor::from(Compressor::Brotli), Decompressor::Brotli); + assert_eq!(Decompressor::from(Compressor::Snappy), Decompressor::Snappy); + assert_eq!( + Decompressor::from(Compressor::Zstd(Default::default())), + Decompressor::Zstd + ); + } +} diff --git a/src/store/footer.rs b/src/store/footer.rs index 102fd675b8..880c1e2d2c 100644 --- a/src/store/footer.rs +++ b/src/store/footer.rs @@ -2,13 +2,13 @@ use std::io; use common::{BinarySerializable, FixedSize, HasLen}; +use super::Decompressor; use crate::directory::FileSlice; -use crate::store::Compressor; #[derive(Debug, Clone, PartialEq)] pub struct DocStoreFooter { pub offset: u64, - pub compressor: Compressor, + pub decompressor: Decompressor, } /// Serialises the footer to a byte-array @@ -18,7 +18,7 @@ pub struct DocStoreFooter { impl BinarySerializable for DocStoreFooter { fn serialize(&self, writer: &mut W) -> io::Result<()> { BinarySerializable::serialize(&self.offset, writer)?; - BinarySerializable::serialize(&self.compressor.get_id(), writer)?; + BinarySerializable::serialize(&self.decompressor.get_id(), writer)?; writer.write_all(&[0; 15])?; Ok(()) } @@ -30,7 +30,7 @@ impl BinarySerializable for DocStoreFooter { reader.read_exact(&mut skip_buf)?; Ok(DocStoreFooter { offset, - compressor: Compressor::from_id(compressor_id), + decompressor: Decompressor::from_id(compressor_id), }) } } @@ -40,8 +40,11 @@ impl FixedSize for DocStoreFooter { } impl DocStoreFooter { - pub fn new(offset: u64, compressor: Compressor) -> Self { - DocStoreFooter { offset, compressor } + pub fn new(offset: u64, decompressor: Decompressor) -> Self { + DocStoreFooter { + offset, + decompressor, + } } pub fn extract_footer(file: FileSlice) -> io::Result<(DocStoreFooter, FileSlice)> { diff --git a/src/store/mod.rs b/src/store/mod.rs index 5efd22cec6..248d8eba07 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -33,11 +33,13 @@ //! ! mod compressors; +mod decompressors; mod footer; mod index; mod reader; mod writer; -pub use self::compressors::Compressor; +pub use self::compressors::{Compressor, ZstdCompressor}; +pub use self::decompressors::Decompressor; pub use self::reader::StoreReader; pub use self::writer::StoreWriter; @@ -86,7 +88,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize, None); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/reader.rs b/src/store/reader.rs index 3cb4f038db..791961a4ad 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -8,7 +8,7 @@ use ownedbytes::OwnedBytes; use super::footer::DocStoreFooter; use super::index::SkipIndex; -use super::Compressor; +use super::Decompressor; use crate::directory::FileSlice; use crate::error::DataCorruption; use crate::fastfield::AliveBitSet; @@ -23,7 +23,7 @@ type Block = OwnedBytes; /// Reads document off tantivy's [`Store`](./index.html) pub struct StoreReader { - compressor: Compressor, + decompressor: Decompressor, data: FileSlice, skip_index: Arc, space_usage: StoreSpaceUsage, @@ -87,7 +87,7 @@ impl StoreReader { let space_usage = StoreSpaceUsage::new(data_file.len(), offset_index_file.len()); let skip_index = SkipIndex::open(index_data); Ok(StoreReader { - compressor: footer.compressor, + decompressor: footer.decompressor, data: data_file, cache: BlockCache { cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)), @@ -103,8 +103,8 @@ impl StoreReader { self.skip_index.checkpoints() } - pub(crate) fn compressor(&self) -> Compressor { - self.compressor + pub(crate) fn decompressor(&self) -> Decompressor { + self.decompressor } /// Returns the cache hit and miss statistics of the store reader. @@ -141,7 +141,7 @@ impl StoreReader { let compressed_block = self.get_compressed_block(checkpoint)?; let decompressed_block = - OwnedBytes::new(self.compressor.decompress(compressed_block.as_ref())?); + OwnedBytes::new(self.decompressor.decompress(compressed_block.as_ref())?); self.cache .put_into_cache(cache_key, decompressed_block.clone()); @@ -351,6 +351,7 @@ mod tests { use crate::directory::RamDirectory; use crate::schema::{Document, Field}; use crate::store::tests::write_lorem_ipsum_store; + use crate::store::Compressor; use crate::Directory; const BLOCK_SIZE: usize = 16_384; diff --git a/src/store/writer.rs b/src/store/writer.rs index bc5abb0a57..a351d0fcbc 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -5,7 +5,7 @@ use common::{BinarySerializable, CountingWriter, VInt}; use super::compressors::Compressor; use super::footer::DocStoreFooter; use super::index::SkipIndexBuilder; -use super::StoreReader; +use super::{Decompressor, StoreReader}; use crate::directory::{TerminatingWrite, WritePtr}; use crate::schema::Document; use crate::store::index::Checkpoint; @@ -21,7 +21,6 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, - compression_level: Option, doc: DocId, first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, @@ -35,16 +34,10 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new( - writer: WritePtr, - compressor: Compressor, - block_size: usize, - compression_level: Option, - ) -> StoreWriter { + pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { StoreWriter { compressor, block_size, - compression_level, doc: 0, first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), @@ -136,11 +129,8 @@ impl StoreWriter { fn write_and_compress_block(&mut self) -> io::Result<()> { assert!(self.doc > 0); self.intermediary_buffer.clear(); - self.compressor.compress( - &self.current_block[..], - &mut self.intermediary_buffer, - self.compression_level, - )?; + self.compressor + .compress_into(&self.current_block[..], &mut self.intermediary_buffer)?; let start_offset = self.writer.written_bytes() as usize; self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes() as usize; @@ -162,7 +152,7 @@ impl StoreWriter { self.write_and_compress_block()?; } let header_offset: u64 = self.writer.written_bytes() as u64; - let footer = DocStoreFooter::new(header_offset, self.compressor); + let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); self.offset_index_writer.write(&mut self.writer)?; footer.serialize(&mut self.writer)?; self.writer.terminate()