From 456e0db919a38583164151ed8214ce6f830d11d6 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 7 Sep 2022 23:22:52 +0900 Subject: [PATCH] Allow for a same-thread doc compressor. In addition, it isolates the doc compressor logic, better reports io::Result. In the case of the same-thread doc compressor, the blocks are also not copied. --- src/core/index_meta.rs | 53 +++++- src/indexer/segment_serializer.rs | 11 +- src/indexer/segment_writer.rs | 2 +- src/store/mod.rs | 22 ++- src/store/reader.rs | 2 +- src/store/store_compressor.rs | 263 ++++++++++++++++++++++++++++++ src/store/writer.rs | 154 ++--------------- 7 files changed, 352 insertions(+), 155 deletions(-) create mode 100644 src/store/store_compressor.rs diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index e244414dcd..78ee00aa0d 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -235,6 +235,14 @@ impl InnerSegmentMeta { } } +fn return_true() -> bool { + true +} + +fn is_true(val: &bool) -> bool { + *val +} + /// Search Index Settings. /// /// Contains settings which are applied on the whole @@ -248,6 +256,12 @@ pub struct IndexSettings { /// The `Compressor` used to compress the doc store. #[serde(default)] pub docstore_compression: Compressor, + /// If set to true, docstore compression will happen on a dedicated thread. + /// (defaults: true) + #[doc(hidden)] + #[serde(default = "return_true")] + #[serde(skip_serializing_if = "is_true")] + pub docstore_compress_dedicated_thread: bool, #[serde(default = "default_docstore_blocksize")] /// The size of each block that will be compressed and written to disk pub docstore_blocksize: usize, @@ -264,6 +278,7 @@ impl Default for IndexSettings { sort_by_field: None, docstore_compression: Compressor::default(), docstore_blocksize: default_docstore_blocksize(), + docstore_compress_dedicated_thread: true, } } } @@ -395,7 +410,7 @@ mod tests { use super::IndexMeta; use crate::core::index_meta::UntrackedIndexMeta; use crate::schema::{Schema, TEXT}; - use crate::store::ZstdCompressor; + use crate::store::{ZstdCompressor, Compressor}; use crate::{IndexSettings, IndexSortByField, Order}; #[test] @@ -447,6 +462,7 @@ mod tests { compression_level: Some(4), }), docstore_blocksize: 1_000_000, + docstore_compress_dedicated_thread: true, }, segments: Vec::new(), schema, @@ -485,4 +501,39 @@ mod tests { "unknown zstd option \"bla\" at line 1 column 103".to_string() ); } + + #[test] + #[cfg(feature="lz4-compression")] + fn test_index_settings_default() { + let mut index_settings = IndexSettings::default(); + assert_eq!( + index_settings, + IndexSettings { + sort_by_field: None, + docstore_compression: Compressor::default(), + docstore_compress_dedicated_thread: true, + docstore_blocksize: 16_384 + } + ); + { + let index_settings_json = serde_json::to_value(&index_settings).unwrap(); + assert_eq!(index_settings_json, serde_json::json!({ + "docstore_compression": "lz4", + "docstore_blocksize": 16384 + })); + let index_settings_deser: IndexSettings = serde_json::from_value(index_settings_json).unwrap(); + assert_eq!(index_settings_deser, index_settings); + } + { + index_settings.docstore_compress_dedicated_thread = false; + let index_settings_json = serde_json::to_value(&index_settings).unwrap(); + assert_eq!(index_settings_json, serde_json::json!({ + "docstore_compression": "lz4", + "docstore_blocksize": 16384, + "docstore_compress_dedicated_thread": false, + })); + let index_settings_deser: IndexSettings = serde_json::from_value(index_settings_json).unwrap(); + assert_eq!(index_settings_deser, index_settings); + } + } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index ffb6a3dc33..6f8b83f450 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -38,11 +38,16 @@ impl SegmentSerializer { let fieldnorms_serializer = FieldNormsSerializer::from_write(fieldnorms_write)?; let postings_serializer = InvertedIndexSerializer::open(&mut segment)?; - let compressor = segment.index().settings().docstore_compression; - let blocksize = segment.index().settings().docstore_blocksize; + let settings = segment.index().settings(); + let store_writer = StoreWriter::new( + store_write, + settings.docstore_compression, + settings.docstore_blocksize, + settings.docstore_compress_dedicated_thread, + )?; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize)?, + store_writer, fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 8d84d4954d..4ce5726092 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -384,7 +384,7 @@ fn remap_and_write( let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size)?, + StoreWriter::new(store_write, compressor, block_size, true)?, ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/mod.rs b/src/store/mod.rs index bf57bbc1d0..e51d3f7ae7 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -43,6 +43,7 @@ pub use self::decompressors::Decompressor; pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; pub use self::reader::{CacheStats, StoreReader}; pub use self::writer::StoreWriter; +mod store_compressor; #[cfg(feature = "lz4-compression")] mod compression_lz4_block; @@ -82,6 +83,7 @@ pub mod tests { num_docs: usize, compressor: Compressor, blocksize: usize, + separate_thread: bool ) -> Schema { let mut schema_builder = Schema::builder(); let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); @@ -89,7 +91,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap(); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap(); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); @@ -112,7 +114,7 @@ pub mod tests { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); + let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file, 10)?; @@ -148,11 +150,11 @@ pub mod tests { Ok(()) } - fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> { + fn test_store(compressor: Compressor, blocksize: usize, separate_thread: bool) -> crate::Result<()> { let path = Path::new("store"); let directory = RamDirectory::create(); let store_wrt = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); + let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file, 10)?; @@ -177,13 +179,19 @@ pub mod tests { } #[test] - fn test_store_noop() -> crate::Result<()> { - test_store(Compressor::None, BLOCK_SIZE) + fn test_store_no_compression_same_thread() -> crate::Result<()> { + test_store(Compressor::None, BLOCK_SIZE, false) } + + #[test] + fn test_store_no_compression() -> crate::Result<()> { + test_store(Compressor::None, BLOCK_SIZE, true) + } + #[cfg(feature = "lz4-compression")] #[test] fn test_store_lz4_block() -> crate::Result<()> { - test_store(Compressor::Lz4, BLOCK_SIZE) + test_store(Compressor::Lz4, BLOCK_SIZE, true) } #[cfg(feature = "snappy-compression")] #[test] diff --git a/src/store/reader.rs b/src/store/reader.rs index 2f8a26bc3f..4daed9e589 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -393,7 +393,7 @@ mod tests { let directory = RamDirectory::create(); let path = Path::new("store"); let writer = directory.open_write(path)?; - let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE); + let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE, true); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?; diff --git a/src/store/store_compressor.rs b/src/store/store_compressor.rs new file mode 100644 index 0000000000..65ec9c3c2a --- /dev/null +++ b/src/store/store_compressor.rs @@ -0,0 +1,263 @@ +use std::io::Write; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::thread::JoinHandle; +use std::{io, thread}; + +use common::{BinarySerializable, CountingWriter, TerminatingWrite}; + +use crate::directory::WritePtr; +use crate::store::footer::DocStoreFooter; +use crate::store::index::{Checkpoint, SkipIndexBuilder}; +use crate::store::{Compressor, Decompressor, StoreReader}; +use crate::DocId; + +pub struct BlockCompressor(Impls); + +// The struct wrapping an enum is just here to keep the +// impls private. +enum Impls { + SameThread(BlockCompressorImpl), + DedicatedThread(DedicatedThreadBlockCompressorImpl), +} + +impl BlockCompressor { + pub fn new(compressor: Compressor, wrt: WritePtr, dedicated_thread: bool) -> io::Result { + let block_compressor_impl = BlockCompressorImpl::new(compressor, wrt); + if dedicated_thread { + let dedicated_thread_compressor = + DedicatedThreadBlockCompressorImpl::new(block_compressor_impl)?; + Ok(BlockCompressor(Impls::DedicatedThread( + dedicated_thread_compressor, + ))) + } else { + Ok(BlockCompressor(Impls::SameThread(block_compressor_impl))) + } + } + + pub fn compress_block_and_write( + &mut self, + bytes: &[u8], + num_docs_in_block: u32, + ) -> io::Result<()> { + match &mut self.0 { + Impls::SameThread(block_compressor) => { + block_compressor.compress_block_and_write(bytes, num_docs_in_block)?; + } + Impls::DedicatedThread(different_thread_block_compressor) => { + different_thread_block_compressor + .compress_block_and_write(bytes, num_docs_in_block)?; + } + } + Ok(()) + } + + pub fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> { + match &mut self.0 { + Impls::SameThread(block_compressor) => { + block_compressor.stack(store_reader)?; + } + Impls::DedicatedThread(different_thread_block_compressor) => { + different_thread_block_compressor.stack_reader(store_reader)?; + } + } + Ok(()) + } + + pub fn close(self) -> io::Result<()> { + let imp = self.0; + match imp { + Impls::SameThread(block_compressor) => block_compressor.close(), + Impls::DedicatedThread(different_thread_block_compressor) => { + different_thread_block_compressor.close() + } + } + } +} + +struct BlockCompressorImpl { + compressor: Compressor, + first_doc_in_block: DocId, + offset_index_writer: SkipIndexBuilder, + intermediary_buffer: Vec, + writer: CountingWriter, +} + +impl BlockCompressorImpl { + fn new(compressor: Compressor, writer: WritePtr) -> Self { + Self { + compressor, + first_doc_in_block: 0, + offset_index_writer: SkipIndexBuilder::new(), + intermediary_buffer: Vec::new(), + writer: CountingWriter::wrap(writer), + } + } + + fn compress_block_and_write(&mut self, data: &[u8], num_docs_in_block: u32) -> io::Result<()> { + assert!(num_docs_in_block > 0); + self.intermediary_buffer.clear(); + self.compressor + .compress_into(data, &mut self.intermediary_buffer)?; + + let start_offset = self.writer.written_bytes() as usize; + self.writer.write_all(&self.intermediary_buffer)?; + let end_offset = self.writer.written_bytes() as usize; + + self.register_checkpoint(Checkpoint { + doc_range: self.first_doc_in_block..self.first_doc_in_block + num_docs_in_block, + byte_range: start_offset..end_offset, + }); + Ok(()) + } + + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint.clone()); + self.first_doc_in_block = checkpoint.doc_range.end; + } + + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + let doc_shift = self.first_doc_in_block; + let start_shift = self.writer.written_bytes() as usize; + + // just bulk write all of the block of the given reader. + self.writer + .write_all(store_reader.block_data()?.as_slice())?; + + // concatenate the index of the `store_reader`, after translating + // its start doc id and its start file offset. + for mut checkpoint in store_reader.block_checkpoints() { + checkpoint.doc_range.start += doc_shift; + checkpoint.doc_range.end += doc_shift; + checkpoint.byte_range.start += start_shift; + checkpoint.byte_range.end += start_shift; + self.register_checkpoint(checkpoint); + } + Ok(()) + } + + fn close(mut self) -> io::Result<()> { + let header_offset: u64 = self.writer.written_bytes() as u64; + let docstore_footer = + DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); + self.offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; + self.writer.terminate() + } +} + +// --------------------------------- +enum BlockCompressorMessage { + CompressBlockAndWrite { + block_data: Vec, + num_docs_in_block: u32, + }, + Stack(StoreReader), +} + +struct DedicatedThreadBlockCompressorImpl { + join_handle: Option>>, + tx: SyncSender, +} + +impl DedicatedThreadBlockCompressorImpl { + fn new(mut block_compressor: BlockCompressorImpl) -> io::Result { + let (tx, rx): ( + SyncSender, + Receiver, + ) = sync_channel(3); + let join_handle = thread::Builder::new() + .name("docstore-compressor-thread".to_string()) + .spawn(move || { + while let Ok(packet) = rx.recv() { + match packet { + BlockCompressorMessage::CompressBlockAndWrite { + block_data, + num_docs_in_block, + } => { + block_compressor + .compress_block_and_write(&block_data[..], num_docs_in_block)?; + } + BlockCompressorMessage::Stack(store_reader) => { + block_compressor.stack(store_reader)?; + } + } + } + block_compressor.close()?; + Ok(()) + })?; + Ok(DedicatedThreadBlockCompressorImpl { + join_handle: Some(join_handle), + tx, + }) + } + + fn compress_block_and_write(&mut self, bytes: &[u8], num_docs_in_block: u32) -> io::Result<()> { + self.send(BlockCompressorMessage::CompressBlockAndWrite { + block_data: bytes.to_vec(), + num_docs_in_block, + }) + } + + fn stack_reader(&mut self, store_reader: StoreReader) -> io::Result<()> { + self.send(BlockCompressorMessage::Stack(store_reader)) + } + + fn send(&mut self, msg: BlockCompressorMessage) -> io::Result<()> { + if self.tx.send(msg).is_err() { + harvest(self.join_handle.take())?; + return Err(io::Error::new(io::ErrorKind::Other, "Unidentified error.")); + } + Ok(()) + } + + fn close(self) -> io::Result<()> { + drop(self.tx); + harvest(self.join_handle) + } +} + +fn harvest(join_handle_opt: Option>>) -> io::Result<()> { + let join_handle = join_handle_opt + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Thread already joined."))?; + join_handle + .join() + .map_err(|_err| io::Error::new(io::ErrorKind::Other, "Compressing thread panicked."))? +} + +#[cfg(test)] +mod tests { + use std::io; + use std::path::Path; + + use crate::directory::RamDirectory; + use crate::store::store_compressor::BlockCompressor; + use crate::store::Compressor; + use crate::Directory; + + fn populate_block_compressor(mut block_compressor: BlockCompressor) -> io::Result<()> { + block_compressor.compress_block_and_write(b"hello", 1)?; + block_compressor.compress_block_and_write(b"happy", 1)?; + block_compressor.close()?; + Ok(()) + } + + #[test] + fn test_block_store_compressor_impls_yield_the_same_result() { + let ram_directory = RamDirectory::default(); + let path1 = Path::new("path1"); + let path2 = Path::new("path2"); + let wrt1 = ram_directory.open_write(path1).unwrap(); + let wrt2 = ram_directory.open_write(path2).unwrap(); + let block_compressor1 = BlockCompressor::new(Compressor::None, wrt1, true).unwrap(); + let block_compressor2 = BlockCompressor::new(Compressor::None, wrt2, false).unwrap(); + populate_block_compressor(block_compressor1).unwrap(); + populate_block_compressor(block_compressor2).unwrap(); + let data1 = ram_directory.open_read(path1).unwrap(); + let data2 = ram_directory.open_read(path2).unwrap(); + assert_eq!(data1.read_bytes().unwrap(), data2.read_bytes().unwrap()); + } +} diff --git a/src/store/writer.rs b/src/store/writer.rs index 6cdab9fd35..d0f27f4e89 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,16 +1,12 @@ use std::io::{self, Write}; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; -use std::thread::{self, JoinHandle}; -use common::{BinarySerializable, CountingWriter, VInt}; +use common::{BinarySerializable, VInt}; use super::compressors::Compressor; -use super::footer::DocStoreFooter; -use super::index::SkipIndexBuilder; -use super::{Decompressor, StoreReader}; -use crate::directory::{TerminatingWrite, WritePtr}; +use super::StoreReader; +use crate::directory::WritePtr; use crate::schema::Document; -use crate::store::index::Checkpoint; +use crate::store::store_compressor::BlockCompressor; use crate::DocId; /// Write tantivy's [`Store`](./index.html) @@ -26,16 +22,7 @@ pub struct StoreWriter { num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, - - // the channel to send data to the compressor thread. - compressor_sender: SyncSender, - // the handle to check for errors on the thread - compressor_thread_handle: JoinHandle>, -} - -enum BlockCompressorMessage { - AddBlock(DocumentBlock), - Stack(StoreReader), + block_compressor: BlockCompressor, } impl StoreWriter { @@ -47,38 +34,16 @@ impl StoreWriter { writer: WritePtr, compressor: Compressor, block_size: usize, + dedicated_thread: bool, ) -> io::Result { - let thread_builder = thread::Builder::new().name("docstore-compressor-thread".to_string()); - - // Channel to send uncompressed data to compressor channel - let (block_sender, block_receiver): ( - SyncSender, - Receiver, - ) = sync_channel(3); - let thread_join_handle = thread_builder.spawn(move || { - let mut block_compressor = BlockCompressor::new(compressor, writer); - while let Ok(packet) = block_receiver.recv() { - match packet { - BlockCompressorMessage::AddBlock(block) => { - block_compressor.compress_block_and_write(block)?; - } - BlockCompressorMessage::Stack(store_reader) => { - block_compressor.stack(store_reader)?; - } - } - } - block_compressor.close()?; - Ok(()) - })?; - + let block_compressor = BlockCompressor::new(compressor, writer, dedicated_thread)?; Ok(StoreWriter { compressor, block_size, num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - compressor_sender: block_sender, - compressor_thread_handle: thread_join_handle, + block_compressor, }) } @@ -105,16 +70,10 @@ impl StoreWriter { if self.current_block.is_empty() { return Ok(()); } - let block = DocumentBlock { - data: self.current_block.to_owned(), - num_docs_in_block: self.num_docs_in_current_block, - }; + self.block_compressor + .compress_block_and_write(&self.current_block, self.num_docs_in_current_block)?; self.current_block.clear(); self.num_docs_in_current_block = 0; - self.compressor_sender - .send(BlockCompressorMessage::AddBlock(block)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - Ok(()) } @@ -157,10 +116,7 @@ impl StoreWriter { pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { // We flush the current block first before stacking self.send_current_block_to_compressor()?; - self.compressor_sender - .send(BlockCompressorMessage::Stack(store_reader)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - + self.block_compressor.stack_reader(store_reader)?; Ok(()) } @@ -170,93 +126,7 @@ impl StoreWriter { /// and serializes the skip list index on disc. pub fn close(mut self) -> io::Result<()> { self.send_current_block_to_compressor()?; - drop(self.compressor_sender); - - self.compressor_thread_handle - .join() - .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; - - Ok(()) - } -} - -/// BlockCompressor is separated from StoreWriter, to be run in an own thread -pub struct BlockCompressor { - compressor: Compressor, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - intermediary_buffer: Vec, - writer: CountingWriter, -} - -struct DocumentBlock { - data: Vec, - num_docs_in_block: DocId, -} - -impl BlockCompressor { - fn new(compressor: Compressor, writer: WritePtr) -> Self { - Self { - compressor, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - intermediary_buffer: Vec::new(), - writer: CountingWriter::wrap(writer), - } - } - - fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> { - assert!(block.num_docs_in_block > 0); - self.intermediary_buffer.clear(); - self.compressor - .compress_into(&block.data[..], &mut self.intermediary_buffer)?; - - let start_offset = self.writer.written_bytes() as usize; - self.writer.write_all(&self.intermediary_buffer)?; - let end_offset = self.writer.written_bytes() as usize; - - self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block, - byte_range: start_offset..end_offset, - }); + self.block_compressor.close()?; Ok(()) } - - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - } - - /// Stacks a store reader on top of the documents written so far. - /// This method is an optimization compared to iterating over the documents - /// in the store and adding them one by one, as the store's data will - /// not be decompressed and then recompressed. - fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { - let doc_shift = self.first_doc_in_block; - let start_shift = self.writer.written_bytes() as usize; - - // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; - - // concatenate the index of the `store_reader`, after translating - // its start doc id and its start file offset. - for mut checkpoint in store_reader.block_checkpoints() { - checkpoint.doc_range.start += doc_shift; - checkpoint.doc_range.end += doc_shift; - checkpoint.byte_range.start += start_shift; - checkpoint.byte_range.end += start_shift; - self.register_checkpoint(checkpoint); - } - Ok(()) - } - fn close(mut self) -> io::Result<()> { - let header_offset: u64 = self.writer.written_bytes() as u64; - let docstore_footer = - DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - - self.offset_index_writer.serialize_into(&mut self.writer)?; - docstore_footer.serialize(&mut self.writer)?; - self.writer.terminate() - } }