From 4c7dedef296dbdf902e0af6b6698465142bdad5c Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 16 Jun 2022 13:06:52 +0800 Subject: [PATCH 01/12] use seperate thread to compress block store Use seperate thread to compress block store for increased indexing performance. This allows to use slower compressors with higher compression ratio, with less or no perfomance impact (with enough cores). A seperate thread is spawned to compress the docstore, which handles single blocks and stacking from other docstores. The spawned compressor thread does not write, instead it sends back the compressed data. This is done in order to avoid writing multithreaded on the same file. --- src/indexer/merger.rs | 2 +- src/indexer/segment_serializer.rs | 2 +- src/indexer/segment_writer.rs | 2 +- src/store/index/mod.rs | 10 +- src/store/index/skip_index_builder.rs | 2 +- src/store/mod.rs | 2 +- src/store/writer.rs | 293 ++++++++++++++++++++------ 7 files changed, 236 insertions(+), 77 deletions(-) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 007934ba53..3c4f2a2abc 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1081,7 +1081,7 @@ impl IndexMerger { store_writer.store_bytes(&doc_bytes)?; } } else { - store_writer.stack(&store_reader)?; + store_writer.stack(store_reader)?; } } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 554503e668..ffb6a3dc33 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -42,7 +42,7 @@ impl SegmentSerializer { let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize), + store_writer: StoreWriter::new(store_write, compressor, blocksize)?, fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 94469c5bca..308cca255e 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -382,7 +382,7 @@ fn remap_and_write( let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size), + StoreWriter::new(store_write, compressor, block_size)?, ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 2401e23d6d..af572e758b 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -54,7 +54,7 @@ mod tests { fn test_skip_index_empty() -> io::Result<()> { let mut output: Vec = Vec::new(); let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert!(skip_cursor.next().is_none()); @@ -70,7 +70,7 @@ mod tests { byte_range: 0..3, }; skip_index_builder.insert(checkpoint.clone()); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert_eq!(skip_cursor.next(), Some(checkpoint)); @@ -108,7 +108,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); assert_eq!( @@ -167,7 +167,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; assert_eq!(output.len(), 4035); let resulting_checkpoints: Vec = SkipIndex::open(OwnedBytes::new(output)) .checkpoints() @@ -238,7 +238,7 @@ mod tests { skip_index_builder.insert(checkpoint); } let mut buffer = Vec::new(); - skip_index_builder.write(&mut buffer).unwrap(); + skip_index_builder.serialize_into(&mut buffer).unwrap(); let skip_index = SkipIndex::open(OwnedBytes::new(buffer)); let iter_checkpoints: Vec = skip_index.checkpoints().collect(); assert_eq!(&checkpoints[..], &iter_checkpoints[..]); diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index cbb899a219..2f34376cd4 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -87,7 +87,7 @@ impl SkipIndexBuilder { } } - pub fn write(mut self, output: &mut W) -> io::Result<()> { + pub fn serialize_into(mut self, output: &mut W) -> io::Result<()> { let mut last_pointer = None; for skip_layer in self.layers.iter_mut() { if let Some(checkpoint) = last_pointer { diff --git a/src/store/mod.rs b/src/store/mod.rs index 88ef9b579e..8dd035fe7f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -88,7 +88,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap(); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/writer.rs b/src/store/writer.rs index a351d0fcbc..140dd08a78 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,6 +1,9 @@ use std::io::{self, Write}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::thread::{self, JoinHandle}; -use common::{BinarySerializable, CountingWriter, VInt}; +use common::{BinarySerializable, VInt}; +use ownedbytes::OwnedBytes; use super::compressors::Compressor; use super::footer::DocStoreFooter; @@ -21,12 +24,23 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, - doc: DocId, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - writer: CountingWriter, + num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + + writer: WritePtr, + // the channel to communicate with the compressor thread. + compressor_sender: SyncSender, + // the channel to receive data to write from the compressor thread. + data_receiver: Receiver, + + // the handle to check for errors on the thread + compressor_thread_handle: JoinHandle>, +} + +enum BlockCompressorMessage { + AddBlock(DocumentBlock), + Stack((StoreReader, DocumentBlock)), } impl StoreWriter { @@ -34,17 +48,43 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { - StoreWriter { + pub fn new( + writer: WritePtr, + compressor: Compressor, + block_size: usize, + ) -> io::Result { + let thread_builder = thread::Builder::new().name("docstore compressor thread".to_string()); + + // Data channel to send fs writes, to write only from current thread + let (data_sender, data_receiver) = sync_channel(3); + // Channel to send uncompressed data to compressor channel + let (block_sender, block_receiver) = sync_channel(3); + let thread_join_handle = thread_builder.spawn(move || { + let mut block_compressor = BlockCompressor::new(compressor, data_sender); + while let Ok(packet) = block_receiver.recv() { + match packet { + BlockCompressorMessage::AddBlock(block) => { + block_compressor.compress_block(block)?; + } + BlockCompressorMessage::Stack((store_reader, block)) => { + block_compressor.stack(block, store_reader)?; + } + } + } + block_compressor.close() + })?; + + Ok(StoreWriter { compressor, block_size, - doc: 0, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - writer: CountingWriter::wrap(writer), + num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - } + writer, + compressor_sender: block_sender, + compressor_thread_handle: thread_join_handle, + data_receiver, + }) } pub(crate) fn compressor(&self) -> Compressor { @@ -56,21 +96,53 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } - /// Store bytes of a serialized document. - /// - /// The document id is implicitely the current number - /// of documents. - pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; - self.current_block.write_all(serialized_document)?; - self.doc += 1; + fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + let block = self.get_current_block(); + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + self.fetch_writes_from_channel()?; } Ok(()) } + /// Try to empty the queue to write into the file. + /// + /// This is done in order to avoid writing from multiple threads into the file. + fn fetch_writes_from_channel(&mut self) -> io::Result<()> { + loop { + match self.data_receiver.try_recv() { + Ok(data) => { + self.writer.write_all(data.as_slice())?; + } + Err(err) => match err { + TryRecvError::Empty => { + break; + } + TryRecvError::Disconnected => { + return Err(io::Error::new( + io::ErrorKind::Other, + "compressor data channel unexpected closed".to_string(), + )); + } + }, + } + } + + Ok(()) + } + + fn get_current_block(&mut self) -> DocumentBlock { + let block = DocumentBlock { + data: self.current_block.to_owned(), + num_docs_in_block: self.num_docs_in_current_block, + }; + self.current_block.clear(); + self.num_docs_in_current_block = 0; + block + } + /// Store a new document. /// /// The document id is implicitely the current number @@ -85,28 +157,145 @@ impl StoreWriter { VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block .write_all(&self.intermediary_buffer[..])?; - self.doc += 1; - if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Store bytes of a serialized document. + /// + /// The document id is implicitely the current number + /// of documents. + pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { + let doc_num_bytes = serialized_document.len(); + VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + self.current_block.write_all(serialized_document)?; + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + self.check_flush_block()?; + let block = self.get_current_block(); + self.compressor_sender + .send(BlockCompressorMessage::Stack((store_reader, block))) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + Ok(()) + } + + /// Finalized the store writer. + /// + /// Compress the last unfinished block if any, + /// and serializes the skip list index on disc. + pub fn close(mut self) -> io::Result<()> { + let block = self.get_current_block(); + if !block.is_empty() { + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; } + drop(self.compressor_sender); + + // Wait for remaining data on the channel to write + while let Ok(data) = self.data_receiver.recv() { + self.writer.write_all(data.as_slice())?; + } + + // The compressor thread should have finished already, since data_receiver stopped + // receiving + let (docstore_footer, offset_index_writer) = self + .compressor_thread_handle + .join() + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; + + offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; + self.writer.terminate() + } +} + +/// BlockCompressor is seperated from StoreWriter, to be run in an own thread +pub struct BlockCompressor { + compressor: Compressor, + doc: DocId, + offset_index_writer: SkipIndexBuilder, + intermediary_buffer: Vec, + written_bytes: usize, + data_sender: SyncSender, +} + +struct DocumentBlock { + data: Vec, + num_docs_in_block: DocId, +} + +impl DocumentBlock { + fn is_empty(&self) -> bool { + self.data.is_empty() + } +} + +impl BlockCompressor { + fn new(compressor: Compressor, data_sender: SyncSender) -> Self { + Self { + compressor, + doc: 0, + offset_index_writer: SkipIndexBuilder::new(), + intermediary_buffer: Vec::new(), + written_bytes: 0, + data_sender, + } + } + + fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> { + assert!(block.num_docs_in_block > 0); + self.intermediary_buffer.clear(); + self.compressor + .compress_into(&block.data[..], &mut self.intermediary_buffer)?; + + let byte_range = self.written_bytes..self.written_bytes + self.intermediary_buffer.len(); + + self.data_sender + .send(OwnedBytes::new(self.intermediary_buffer.to_owned())) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + self.written_bytes += byte_range.len(); + + self.register_checkpoint(Checkpoint { + doc_range: self.doc..self.doc + block.num_docs_in_block, + byte_range, + }); Ok(()) } + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint.clone()); + self.doc = checkpoint.doc_range.end; + } + /// Stacks a store reader on top of the documents written so far. /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; + fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> { + if !block.is_empty() { + self.compress_block(block)?; } - assert_eq!(self.first_doc_in_block, self.doc); let doc_shift = self.doc; - let start_shift = self.writer.written_bytes() as usize; + let start_shift = self.written_bytes; // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; + self.data_sender + .send(store_reader.block_data()?) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + self.written_bytes += store_reader.block_data()?.as_slice().len(); // concatenate the index of the `store_reader`, after translating // its start doc id and its start file offset. @@ -119,42 +308,12 @@ impl StoreWriter { } Ok(()) } + fn close(self) -> io::Result<(DocStoreFooter, SkipIndexBuilder)> { + drop(self.data_sender); - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - self.doc = checkpoint.doc_range.end; - } - - fn write_and_compress_block(&mut self) -> io::Result<()> { - assert!(self.doc > 0); - self.intermediary_buffer.clear(); - self.compressor - .compress_into(&self.current_block[..], &mut self.intermediary_buffer)?; - let start_offset = self.writer.written_bytes() as usize; - self.writer.write_all(&self.intermediary_buffer)?; - let end_offset = self.writer.written_bytes() as usize; - let end_doc = self.doc; - self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..end_doc, - byte_range: start_offset..end_offset, - }); - self.current_block.clear(); - Ok(()) - } - - /// Finalized the store writer. - /// - /// Compress the last unfinished block if any, - /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; - } - let header_offset: u64 = self.writer.written_bytes() as u64; + let header_offset: u64 = self.written_bytes as u64; let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - self.offset_index_writer.write(&mut self.writer)?; - footer.serialize(&mut self.writer)?; - self.writer.terminate() + + Ok((footer, self.offset_index_writer)) } } From 7bf5962554ecc14401d70b9c2a47ff1356f7bbf0 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 16 Jun 2022 16:50:56 +0800 Subject: [PATCH 02/12] merge match, explicit type --- src/store/writer.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index 140dd08a78..f68fce831e 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -56,9 +56,13 @@ impl StoreWriter { let thread_builder = thread::Builder::new().name("docstore compressor thread".to_string()); // Data channel to send fs writes, to write only from current thread - let (data_sender, data_receiver) = sync_channel(3); + let (data_sender, data_receiver): (SyncSender, Receiver) = + sync_channel(3); // Channel to send uncompressed data to compressor channel - let (block_sender, block_receiver) = sync_channel(3); + let (block_sender, block_receiver): ( + SyncSender, + Receiver, + ) = sync_channel(3); let thread_join_handle = thread_builder.spawn(move || { let mut block_compressor = BlockCompressor::new(compressor, data_sender); while let Ok(packet) = block_receiver.recv() { @@ -116,17 +120,15 @@ impl StoreWriter { Ok(data) => { self.writer.write_all(data.as_slice())?; } - Err(err) => match err { - TryRecvError::Empty => { - break; - } - TryRecvError::Disconnected => { - return Err(io::Error::new( - io::ErrorKind::Other, - "compressor data channel unexpected closed".to_string(), - )); - } - }, + Err(TryRecvError::Empty) => { + break; + } + Err(TryRecvError::Disconnected) => { + return Err(io::Error::new( + io::ErrorKind::Other, + "compressor data channel unexpected closed".to_string(), + )); + } } } From efabcbcdf5f23d8536177462ee630ed7648f99ab Mon Sep 17 00:00:00 2001 From: PSeitz Date: Fri, 17 Jun 2022 10:21:20 +0200 Subject: [PATCH 03/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index f68fce831e..1a15ab4eaa 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -222,7 +222,7 @@ impl StoreWriter { } } -/// BlockCompressor is seperated from StoreWriter, to be run in an own thread +/// BlockCompressor is separated from StoreWriter, to be run in an own thread pub struct BlockCompressor { compressor: Compressor, doc: DocId, From 8b6647e90877ac0f110a543b355646cd625c6d38 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 21 Jun 2022 10:21:35 +0800 Subject: [PATCH 04/12] move writer to compressor thread --- common/src/writer.rs | 2 +- src/store/writer.rs | 100 +++++++++++-------------------------------- 2 files changed, 25 insertions(+), 77 deletions(-) diff --git a/common/src/writer.rs b/common/src/writer.rs index 20f56221d7..9b8b86908d 100644 --- a/common/src/writer.rs +++ b/common/src/writer.rs @@ -62,7 +62,7 @@ impl TerminatingWrite for CountingWriter { pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer -pub trait TerminatingWrite: Write { +pub trait TerminatingWrite: Write + Send { /// Indicate that the writer will no longer be used. Internally call terminate_ref. fn terminate(mut self) -> io::Result<()> where Self: Sized { diff --git a/src/store/writer.rs b/src/store/writer.rs index 1a15ab4eaa..69273cd163 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,9 +1,8 @@ use std::io::{self, Write}; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender, TryRecvError}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::thread::{self, JoinHandle}; -use common::{BinarySerializable, VInt}; -use ownedbytes::OwnedBytes; +use common::{BinarySerializable, CountingWriter, VInt}; use super::compressors::Compressor; use super::footer::DocStoreFooter; @@ -28,14 +27,11 @@ pub struct StoreWriter { intermediary_buffer: Vec, current_block: Vec, - writer: WritePtr, - // the channel to communicate with the compressor thread. + // the channel to send data to the compressor thread. compressor_sender: SyncSender, - // the channel to receive data to write from the compressor thread. - data_receiver: Receiver, // the handle to check for errors on the thread - compressor_thread_handle: JoinHandle>, + compressor_thread_handle: JoinHandle>, } enum BlockCompressorMessage { @@ -55,16 +51,13 @@ impl StoreWriter { ) -> io::Result { let thread_builder = thread::Builder::new().name("docstore compressor thread".to_string()); - // Data channel to send fs writes, to write only from current thread - let (data_sender, data_receiver): (SyncSender, Receiver) = - sync_channel(3); // Channel to send uncompressed data to compressor channel let (block_sender, block_receiver): ( SyncSender, Receiver, ) = sync_channel(3); let thread_join_handle = thread_builder.spawn(move || { - let mut block_compressor = BlockCompressor::new(compressor, data_sender); + let mut block_compressor = BlockCompressor::new(compressor, writer); while let Ok(packet) = block_receiver.recv() { match packet { BlockCompressorMessage::AddBlock(block) => { @@ -84,10 +77,8 @@ impl StoreWriter { num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - writer, compressor_sender: block_sender, compressor_thread_handle: thread_join_handle, - data_receiver, }) } @@ -106,35 +97,10 @@ impl StoreWriter { self.compressor_sender .send(BlockCompressorMessage::AddBlock(block)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - self.fetch_writes_from_channel()?; } Ok(()) } - /// Try to empty the queue to write into the file. - /// - /// This is done in order to avoid writing from multiple threads into the file. - fn fetch_writes_from_channel(&mut self) -> io::Result<()> { - loop { - match self.data_receiver.try_recv() { - Ok(data) => { - self.writer.write_all(data.as_slice())?; - } - Err(TryRecvError::Empty) => { - break; - } - Err(TryRecvError::Disconnected) => { - return Err(io::Error::new( - io::ErrorKind::Other, - "compressor data channel unexpected closed".to_string(), - )); - } - } - } - - Ok(()) - } - fn get_current_block(&mut self) -> DocumentBlock { let block = DocumentBlock { data: self.current_block.to_owned(), @@ -204,21 +170,11 @@ impl StoreWriter { } drop(self.compressor_sender); - // Wait for remaining data on the channel to write - while let Ok(data) = self.data_receiver.recv() { - self.writer.write_all(data.as_slice())?; - } - - // The compressor thread should have finished already, since data_receiver stopped - // receiving - let (docstore_footer, offset_index_writer) = self - .compressor_thread_handle + self.compressor_thread_handle .join() .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; - offset_index_writer.serialize_into(&mut self.writer)?; - docstore_footer.serialize(&mut self.writer)?; - self.writer.terminate() + Ok(()) } } @@ -228,8 +184,7 @@ pub struct BlockCompressor { doc: DocId, offset_index_writer: SkipIndexBuilder, intermediary_buffer: Vec, - written_bytes: usize, - data_sender: SyncSender, + writer: CountingWriter, } struct DocumentBlock { @@ -244,14 +199,13 @@ impl DocumentBlock { } impl BlockCompressor { - fn new(compressor: Compressor, data_sender: SyncSender) -> Self { + fn new(compressor: Compressor, writer: WritePtr) -> Self { Self { compressor, doc: 0, offset_index_writer: SkipIndexBuilder::new(), intermediary_buffer: Vec::new(), - written_bytes: 0, - data_sender, + writer: CountingWriter::wrap(writer), } } @@ -261,17 +215,13 @@ impl BlockCompressor { self.compressor .compress_into(&block.data[..], &mut self.intermediary_buffer)?; - let byte_range = self.written_bytes..self.written_bytes + self.intermediary_buffer.len(); - - self.data_sender - .send(OwnedBytes::new(self.intermediary_buffer.to_owned())) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - - self.written_bytes += byte_range.len(); + let start_offset = self.writer.written_bytes() as usize; + self.writer.write_all(&self.intermediary_buffer)?; + let end_offset = self.writer.written_bytes() as usize; self.register_checkpoint(Checkpoint { doc_range: self.doc..self.doc + block.num_docs_in_block, - byte_range, + byte_range: start_offset..end_offset, }); Ok(()) } @@ -290,14 +240,11 @@ impl BlockCompressor { self.compress_block(block)?; } let doc_shift = self.doc; - let start_shift = self.written_bytes; + let start_shift = self.writer.written_bytes() as usize; // just bulk write all of the block of the given reader. - self.data_sender - .send(store_reader.block_data()?) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - - self.written_bytes += store_reader.block_data()?.as_slice().len(); + self.writer + .write_all(store_reader.block_data()?.as_slice())?; // concatenate the index of the `store_reader`, after translating // its start doc id and its start file offset. @@ -310,12 +257,13 @@ impl BlockCompressor { } Ok(()) } - fn close(self) -> io::Result<(DocStoreFooter, SkipIndexBuilder)> { - drop(self.data_sender); - - let header_offset: u64 = self.written_bytes as u64; - let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); + fn close(mut self) -> io::Result<()> { + let header_offset: u64 = self.writer.written_bytes() as u64; + let docstore_footer = + DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - Ok((footer, self.offset_index_writer)) + self.offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; + self.writer.terminate() } } From 449594f67a5b5c7aaf59e0e363b4082dcab7651e Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 21 Jun 2022 07:48:12 +0200 Subject: [PATCH 05/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index 69273cd163..735d01ed6a 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -49,7 +49,7 @@ impl StoreWriter { compressor: Compressor, block_size: usize, ) -> io::Result { - let thread_builder = thread::Builder::new().name("docstore compressor thread".to_string()); + let thread_builder = thread::Builder::new().name("docstore-compressor-thread".to_string()); // Channel to send uncompressed data to compressor channel let (block_sender, block_receiver): ( From 0135fbc4c83483fa9044ceda0f2fbc2bc365e52a Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 21 Jun 2022 07:48:19 +0200 Subject: [PATCH 06/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index 735d01ed6a..27fc2351a0 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -68,7 +68,8 @@ impl StoreWriter { } } } - block_compressor.close() + block_compressor.close()?; + Ok(()) })?; Ok(StoreWriter { From 79e42d4a6da02cfed70641188c70777b52d98859 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 21 Jun 2022 09:21:10 +0200 Subject: [PATCH 07/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/store/writer.rs b/src/store/writer.rs index 27fc2351a0..cd80cd9082 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -92,6 +92,7 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } + /// Checks if the current block is full, and if so, compresses and flushes it. fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { let block = self.get_current_block(); From 0bc6b4a11768a09d8373e50473606d09530eab90 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Tue, 21 Jun 2022 16:10:48 +0800 Subject: [PATCH 08/12] renames and refactoring --- src/store/writer.rs | 54 ++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index cd80cd9082..244bf72fc6 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -29,14 +29,13 @@ pub struct StoreWriter { // the channel to send data to the compressor thread. compressor_sender: SyncSender, - // the handle to check for errors on the thread compressor_thread_handle: JoinHandle>, } enum BlockCompressorMessage { AddBlock(DocumentBlock), - Stack((StoreReader, DocumentBlock)), + Stack(StoreReader), } impl StoreWriter { @@ -61,10 +60,10 @@ impl StoreWriter { while let Ok(packet) = block_receiver.recv() { match packet { BlockCompressorMessage::AddBlock(block) => { - block_compressor.compress_block(block)?; + block_compressor.compress_block_and_write(block)?; } - BlockCompressorMessage::Stack((store_reader, block)) => { - block_compressor.stack(block, store_reader)?; + BlockCompressorMessage::Stack(store_reader) => { + block_compressor.stack(store_reader)?; } } } @@ -95,22 +94,25 @@ impl StoreWriter { /// Checks if the current block is full, and if so, compresses and flushes it. fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - let block = self.get_current_block(); - self.compressor_sender - .send(BlockCompressorMessage::AddBlock(block)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + self.send_current_block_to_compressor()?; } Ok(()) } - fn get_current_block(&mut self) -> DocumentBlock { + /// Flushes current uncompressed block and sends to compressor. + fn send_current_block_to_compressor(&mut self) -> io::Result<()> { let block = DocumentBlock { data: self.current_block.to_owned(), num_docs_in_block: self.num_docs_in_current_block, }; self.current_block.clear(); self.num_docs_in_current_block = 0; - block + if !block.is_empty() { + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + } + Ok(()) } /// Store a new document. @@ -150,10 +152,10 @@ impl StoreWriter { /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { - self.check_flush_block()?; - let block = self.get_current_block(); + // We flush the current block first before stacking + self.send_current_block_to_compressor()?; self.compressor_sender - .send(BlockCompressorMessage::Stack((store_reader, block))) + .send(BlockCompressorMessage::Stack(store_reader)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; Ok(()) @@ -164,12 +166,7 @@ impl StoreWriter { /// Compress the last unfinished block if any, /// and serializes the skip list index on disc. pub fn close(mut self) -> io::Result<()> { - let block = self.get_current_block(); - if !block.is_empty() { - self.compressor_sender - .send(BlockCompressorMessage::AddBlock(block)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - } + self.send_current_block_to_compressor()?; drop(self.compressor_sender); self.compressor_thread_handle @@ -183,7 +180,7 @@ impl StoreWriter { /// BlockCompressor is separated from StoreWriter, to be run in an own thread pub struct BlockCompressor { compressor: Compressor, - doc: DocId, + first_doc_in_block: DocId, offset_index_writer: SkipIndexBuilder, intermediary_buffer: Vec, writer: CountingWriter, @@ -204,14 +201,14 @@ impl BlockCompressor { fn new(compressor: Compressor, writer: WritePtr) -> Self { Self { compressor, - doc: 0, + first_doc_in_block: 0, offset_index_writer: SkipIndexBuilder::new(), intermediary_buffer: Vec::new(), writer: CountingWriter::wrap(writer), } } - fn compress_block(&mut self, block: DocumentBlock) -> io::Result<()> { + fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> { assert!(block.num_docs_in_block > 0); self.intermediary_buffer.clear(); self.compressor @@ -222,7 +219,7 @@ impl BlockCompressor { let end_offset = self.writer.written_bytes() as usize; self.register_checkpoint(Checkpoint { - doc_range: self.doc..self.doc + block.num_docs_in_block, + doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block, byte_range: start_offset..end_offset, }); Ok(()) @@ -230,18 +227,15 @@ impl BlockCompressor { fn register_checkpoint(&mut self, checkpoint: Checkpoint) { self.offset_index_writer.insert(checkpoint.clone()); - self.doc = checkpoint.doc_range.end; + self.first_doc_in_block = checkpoint.doc_range.end; } /// Stacks a store reader on top of the documents written so far. /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - fn stack(&mut self, block: DocumentBlock, store_reader: StoreReader) -> io::Result<()> { - if !block.is_empty() { - self.compress_block(block)?; - } - let doc_shift = self.doc; + fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + let doc_shift = self.first_doc_in_block; let start_shift = self.writer.written_bytes() as usize; // just bulk write all of the block of the given reader. From 2b713f0977f139afc1a71fb1b9464edc61394d16 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 23 Jun 2022 09:11:22 +0200 Subject: [PATCH 09/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index 244bf72fc6..465ceaea99 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -126,7 +126,7 @@ impl StoreWriter { // intermediary_buffer due to the borrow checker // a new buffer costs ~1% indexing performance let doc_num_bytes = self.intermediary_buffer.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); self.current_block .write_all(&self.intermediary_buffer[..])?; self.num_docs_in_current_block += 1; From c3220bece05e2c99226eab1d81236211171b9a7c Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 23 Jun 2022 09:11:36 +0200 Subject: [PATCH 10/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index 465ceaea99..dd6bd903e0 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -141,7 +141,7 @@ impl StoreWriter { pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { let doc_num_bytes = serialized_document.len(); VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; - self.current_block.write_all(serialized_document)?; + self.current_block.extend_from_slice(serialized_document); self.num_docs_in_current_block += 1; self.check_flush_block()?; Ok(()) From ad76d1100804f8322fb6050183a59ada76ab4e9c Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 23 Jun 2022 09:11:48 +0200 Subject: [PATCH 11/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index dd6bd903e0..f15c5a8152 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -140,7 +140,7 @@ impl StoreWriter { /// of documents. pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); self.current_block.extend_from_slice(serialized_document); self.num_docs_in_current_block += 1; self.check_flush_block()?; From 9baefbe2ab8695952f3db5dca30b8b543b00bf38 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Thu, 23 Jun 2022 09:24:49 +0200 Subject: [PATCH 12/12] Update src/store/writer.rs Co-authored-by: Paul Masurel --- src/store/writer.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/src/store/writer.rs b/src/store/writer.rs index f15c5a8152..3327dd1e97 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -101,17 +101,20 @@ impl StoreWriter { /// Flushes current uncompressed block and sends to compressor. fn send_current_block_to_compressor(&mut self) -> io::Result<()> { + // We don't do anything if the current block is empty to begin with. + if self.current_block.is_empty() { + return Ok(()); + } let block = DocumentBlock { data: self.current_block.to_owned(), num_docs_in_block: self.num_docs_in_current_block, }; self.current_block.clear(); self.num_docs_in_current_block = 0; - if !block.is_empty() { - self.compressor_sender - .send(BlockCompressorMessage::AddBlock(block)) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - } + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + Ok(()) } @@ -191,12 +194,6 @@ struct DocumentBlock { num_docs_in_block: DocId, } -impl DocumentBlock { - fn is_empty(&self) -> bool { - self.data.is_empty() - } -} - impl BlockCompressor { fn new(compressor: Compressor, writer: WritePtr) -> Self { Self {