diff --git a/common/src/writer.rs b/common/src/writer.rs index 20f56221d7..9b8b86908d 100644 --- a/common/src/writer.rs +++ b/common/src/writer.rs @@ -62,7 +62,7 @@ impl TerminatingWrite for CountingWriter { pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer -pub trait TerminatingWrite: Write { +pub trait TerminatingWrite: Write + Send { /// Indicate that the writer will no longer be used. Internally call terminate_ref. fn terminate(mut self) -> io::Result<()> where Self: Sized { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 007934ba53..3c4f2a2abc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1081,7 +1081,7 @@ impl IndexMerger { store_writer.store_bytes(&doc_bytes)?; } } else { - store_writer.stack(&store_reader)?; + store_writer.stack(store_reader)?; } } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 554503e668..ffb6a3dc33 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -42,7 +42,7 @@ impl SegmentSerializer { let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize), + store_writer: StoreWriter::new(store_write, compressor, blocksize)?, fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 94469c5bca..308cca255e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -382,7 +382,7 @@ fn remap_and_write( let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size), + StoreWriter::new(store_write, compressor, block_size)?, ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 2401e23d6d..af572e758b 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -54,7 +54,7 @@ mod tests { fn test_skip_index_empty() -> io::Result<()> { let mut output: Vec = Vec::new(); let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert!(skip_cursor.next().is_none()); @@ -70,7 +70,7 @@ mod tests { byte_range: 0..3, }; skip_index_builder.insert(checkpoint.clone()); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert_eq!(skip_cursor.next(), Some(checkpoint)); @@ -108,7 +108,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); assert_eq!( @@ -167,7 +167,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; assert_eq!(output.len(), 4035); let resulting_checkpoints: Vec = SkipIndex::open(OwnedBytes::new(output)) .checkpoints() @@ -238,7 +238,7 @@ mod tests { skip_index_builder.insert(checkpoint); } let mut buffer = Vec::new(); - skip_index_builder.write(&mut buffer).unwrap(); + skip_index_builder.serialize_into(&mut buffer).unwrap(); let skip_index = SkipIndex::open(OwnedBytes::new(buffer)); let iter_checkpoints: Vec = skip_index.checkpoints().collect(); assert_eq!(&checkpoints[..], &iter_checkpoints[..]); diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index cbb899a219..2f34376cd4 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -87,7 +87,7 @@ impl SkipIndexBuilder { } } - pub fn write(mut self, output: &mut W) -> io::Result<()> { + pub fn serialize_into(mut self, output: &mut W) -> io::Result<()> { let mut last_pointer = None; for skip_layer in self.layers.iter_mut() { if let Some(checkpoint) = last_pointer { diff --git a/src/store/mod.rs b/src/store/mod.rs index 88ef9b579e..8dd035fe7f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -88,7 +88,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap(); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/writer.rs b/src/store/writer.rs index a351d0fcbc..3327dd1e97 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,4 +1,6 @@ use std::io::{self, Write}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::thread::{self, JoinHandle}; use common::{BinarySerializable, CountingWriter, VInt}; @@ -21,12 +23,19 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, - doc: DocId, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - writer: CountingWriter, + num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + + // the channel to send data to the compressor thread. + compressor_sender: SyncSender, + // the handle to check for errors on the thread + compressor_thread_handle: JoinHandle>, +} + +enum BlockCompressorMessage { + AddBlock(DocumentBlock), + Stack(StoreReader), } impl StoreWriter { @@ -34,17 +43,43 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { - StoreWriter { + pub fn new( + writer: WritePtr, + compressor: Compressor, + block_size: usize, + ) -> io::Result { + let thread_builder = thread::Builder::new().name("docstore-compressor-thread".to_string()); + + // Channel to send uncompressed data to compressor channel + let (block_sender, block_receiver): ( + SyncSender, + Receiver, + ) = sync_channel(3); + let thread_join_handle = thread_builder.spawn(move || { + let mut block_compressor = BlockCompressor::new(compressor, writer); + while let Ok(packet) = block_receiver.recv() { + match packet { + BlockCompressorMessage::AddBlock(block) => { + block_compressor.compress_block_and_write(block)?; + } + BlockCompressorMessage::Stack(store_reader) => { + block_compressor.stack(store_reader)?; + } + } + } + block_compressor.close()?; + Ok(()) + })?; + + Ok(StoreWriter { compressor, block_size, - doc: 0, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - writer: CountingWriter::wrap(writer), + num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - } + compressor_sender: block_sender, + compressor_thread_handle: thread_join_handle, + }) } pub(crate) fn compressor(&self) -> Compressor { @@ -56,18 +91,30 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } - /// Store bytes of a serialized document. - /// - /// The document id is implicitely the current number - /// of documents. - pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; - self.current_block.write_all(serialized_document)?; - self.doc += 1; + /// Checks if the current block is full, and if so, compresses and flushes it. + fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + self.send_current_block_to_compressor()?; + } + Ok(()) + } + + /// Flushes current uncompressed block and sends to compressor. + fn send_current_block_to_compressor(&mut self) -> io::Result<()> { + // We don't do anything if the current block is empty to begin with. + if self.current_block.is_empty() { + return Ok(()); } + let block = DocumentBlock { + data: self.current_block.to_owned(), + num_docs_in_block: self.num_docs_in_current_block, + }; + self.current_block.clear(); + self.num_docs_in_current_block = 0; + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + Ok(()) } @@ -82,13 +129,24 @@ impl StoreWriter { // intermediary_buffer due to the borrow checker // a new buffer costs ~1% indexing performance let doc_num_bytes = self.intermediary_buffer.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); self.current_block .write_all(&self.intermediary_buffer[..])?; - self.doc += 1; - if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; - } + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Store bytes of a serialized document. + /// + /// The document id is implicitely the current number + /// of documents. + pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { + let doc_num_bytes = serialized_document.len(); + VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); + self.current_block.extend_from_slice(serialized_document); + self.num_docs_in_current_block += 1; + self.check_flush_block()?; Ok(()) } @@ -96,65 +154,109 @@ impl StoreWriter { /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; - } - assert_eq!(self.first_doc_in_block, self.doc); - let doc_shift = self.doc; - let start_shift = self.writer.written_bytes() as usize; + pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + // We flush the current block first before stacking + self.send_current_block_to_compressor()?; + self.compressor_sender + .send(BlockCompressorMessage::Stack(store_reader)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; + Ok(()) + } + + /// Finalized the store writer. + /// + /// Compress the last unfinished block if any, + /// and serializes the skip list index on disc. + pub fn close(mut self) -> io::Result<()> { + self.send_current_block_to_compressor()?; + drop(self.compressor_sender); + + self.compressor_thread_handle + .join() + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; - // concatenate the index of the `store_reader`, after translating - // its start doc id and its start file offset. - for mut checkpoint in store_reader.block_checkpoints() { - checkpoint.doc_range.start += doc_shift; - checkpoint.doc_range.end += doc_shift; - checkpoint.byte_range.start += start_shift; - checkpoint.byte_range.end += start_shift; - self.register_checkpoint(checkpoint); - } Ok(()) } +} - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - self.doc = checkpoint.doc_range.end; +/// BlockCompressor is separated from StoreWriter, to be run in an own thread +pub struct BlockCompressor { + compressor: Compressor, + first_doc_in_block: DocId, + offset_index_writer: SkipIndexBuilder, + intermediary_buffer: Vec, + writer: CountingWriter, +} + +struct DocumentBlock { + data: Vec, + num_docs_in_block: DocId, +} + +impl BlockCompressor { + fn new(compressor: Compressor, writer: WritePtr) -> Self { + Self { + compressor, + first_doc_in_block: 0, + offset_index_writer: SkipIndexBuilder::new(), + intermediary_buffer: Vec::new(), + writer: CountingWriter::wrap(writer), + } } - fn write_and_compress_block(&mut self) -> io::Result<()> { - assert!(self.doc > 0); + fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> { + assert!(block.num_docs_in_block > 0); self.intermediary_buffer.clear(); self.compressor - .compress_into(&self.current_block[..], &mut self.intermediary_buffer)?; + .compress_into(&block.data[..], &mut self.intermediary_buffer)?; + let start_offset = self.writer.written_bytes() as usize; self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes() as usize; - let end_doc = self.doc; + self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..end_doc, + doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block, byte_range: start_offset..end_offset, }); - self.current_block.clear(); Ok(()) } - /// Finalized the store writer. - /// - /// Compress the last unfinished block if any, - /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint.clone()); + self.first_doc_in_block = checkpoint.doc_range.end; + } + + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + let doc_shift = self.first_doc_in_block; + let start_shift = self.writer.written_bytes() as usize; + + // just bulk write all of the block of the given reader. + self.writer + .write_all(store_reader.block_data()?.as_slice())?; + + // concatenate the index of the `store_reader`, after translating + // its start doc id and its start file offset. + for mut checkpoint in store_reader.block_checkpoints() { + checkpoint.doc_range.start += doc_shift; + checkpoint.doc_range.end += doc_shift; + checkpoint.byte_range.start += start_shift; + checkpoint.byte_range.end += start_shift; + self.register_checkpoint(checkpoint); } + Ok(()) + } + fn close(mut self) -> io::Result<()> { let header_offset: u64 = self.writer.written_bytes() as u64; - let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - self.offset_index_writer.write(&mut self.writer)?; - footer.serialize(&mut self.writer)?; + let docstore_footer = + DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); + + self.offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; self.writer.terminate() } }