Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use separate thread to compress block store #1389

Merged
merged 12 commits into from Jun 23, 2022
2 changes: 1 addition & 1 deletion common/src/writer.rs
Expand Up @@ -62,7 +62,7 @@ impl<W: TerminatingWrite> TerminatingWrite for CountingWriter<W> {
pub struct AntiCallToken(());

/// Trait used to indicate when no more write need to be done on a writer
pub trait TerminatingWrite: Write {
pub trait TerminatingWrite: Write + Send {
/// Indicate that the writer will no longer be used. Internally call terminate_ref.
fn terminate(mut self) -> io::Result<()>
where Self: Sized {
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/merger.rs
Expand Up @@ -1081,7 +1081,7 @@ impl IndexMerger {
store_writer.store_bytes(&doc_bytes)?;
}
} else {
store_writer.stack(&store_reader)?;
store_writer.stack(store_reader)?;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/segment_serializer.rs
Expand Up @@ -42,7 +42,7 @@ impl SegmentSerializer {
let blocksize = segment.index().settings().docstore_blocksize;
Ok(SegmentSerializer {
segment,
store_writer: StoreWriter::new(store_write, compressor, blocksize),
store_writer: StoreWriter::new(store_write, compressor, blocksize)?,
fast_field_serializer,
fieldnorms_serializer: Some(fieldnorms_serializer),
postings_serializer,
Expand Down
2 changes: 1 addition & 1 deletion src/indexer/segment_writer.rs
Expand Up @@ -382,7 +382,7 @@ fn remap_and_write(
let block_size = serializer.segment().index().settings().docstore_blocksize;
let old_store_writer = std::mem::replace(
&mut serializer.store_writer,
StoreWriter::new(store_write, compressor, block_size),
StoreWriter::new(store_write, compressor, block_size)?,
);
old_store_writer.close()?;
let store_read = StoreReader::open(
Expand Down
10 changes: 5 additions & 5 deletions src/store/index/mod.rs
Expand Up @@ -54,7 +54,7 @@ mod tests {
fn test_skip_index_empty() -> io::Result<()> {
let mut output: Vec<u8> = Vec::new();
let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new();
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert!(skip_cursor.next().is_none());
Expand All @@ -70,7 +70,7 @@ mod tests {
byte_range: 0..3,
};
skip_index_builder.insert(checkpoint.clone());
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
let mut skip_cursor = skip_index.checkpoints();
assert_eq!(skip_cursor.next(), Some(checkpoint));
Expand Down Expand Up @@ -108,7 +108,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;

let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output));
assert_eq!(
Expand Down Expand Up @@ -167,7 +167,7 @@ mod tests {
for checkpoint in &checkpoints {
skip_index_builder.insert(checkpoint.clone());
}
skip_index_builder.write(&mut output)?;
skip_index_builder.serialize_into(&mut output)?;
assert_eq!(output.len(), 4035);
let resulting_checkpoints: Vec<Checkpoint> = SkipIndex::open(OwnedBytes::new(output))
.checkpoints()
Expand Down Expand Up @@ -238,7 +238,7 @@ mod tests {
skip_index_builder.insert(checkpoint);
}
let mut buffer = Vec::new();
skip_index_builder.write(&mut buffer).unwrap();
skip_index_builder.serialize_into(&mut buffer).unwrap();
let skip_index = SkipIndex::open(OwnedBytes::new(buffer));
let iter_checkpoints: Vec<Checkpoint> = skip_index.checkpoints().collect();
assert_eq!(&checkpoints[..], &iter_checkpoints[..]);
Expand Down
2 changes: 1 addition & 1 deletion src/store/index/skip_index_builder.rs
Expand Up @@ -87,7 +87,7 @@ impl SkipIndexBuilder {
}
}

pub fn write<W: Write>(mut self, output: &mut W) -> io::Result<()> {
pub fn serialize_into<W: Write>(mut self, output: &mut W) -> io::Result<()> {
let mut last_pointer = None;
for skip_layer in self.layers.iter_mut() {
if let Some(checkpoint) = last_pointer {
Expand Down
2 changes: 1 addition & 1 deletion src/store/mod.rs
Expand Up @@ -88,7 +88,7 @@ pub mod tests {
schema_builder.add_text_field("title", TextOptions::default().set_stored());
let schema = schema_builder.build();
{
let mut store_writer = StoreWriter::new(writer, compressor, blocksize);
let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap();
for i in 0..num_docs {
let mut doc = Document::default();
doc.add_field_value(field_body, LOREM.to_string());
Expand Down