Skip to content

Commit

Permalink
Minor changes in indexing. (#1285)
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 21, 2022
1 parent 9815067 commit d37633e
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 107 deletions.
21 changes: 13 additions & 8 deletions benches/index-bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tantivy::schema::{INDEXED, STORED, STRING, TEXT};
use tantivy::Index;

const HDFS_LOGS: &str = include_str!("hdfs.json");
const NUM_REPEATS: usize = 10;

pub fn hdfs_index_benchmark(c: &mut Criterion) {
let schema = {
Expand All @@ -27,7 +28,7 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
Expand All @@ -39,7 +40,7 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for _ in 0..10 {
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
Expand All @@ -52,19 +53,23 @@ pub fn hdfs_index_benchmark(c: &mut Criterion) {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
})
});
group.bench_function("index-hdfs-with-commit-with-docstore", |b| {
b.iter(|| {
let index = Index::create_in_ram(schema_with_store.clone());
let mut index_writer = index.writer_with_num_threads(1, 100_000_000).unwrap();
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
for _ in 0..NUM_REPEATS {
for doc_json in HDFS_LOGS.trim().split("\n") {
let doc = schema.parse_document(doc_json).unwrap();
index_writer.add_document(doc).unwrap();
}
}
index_writer.commit().unwrap();
})
Expand Down
35 changes: 15 additions & 20 deletions src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn remap_doc_opstamps(
/// The segment is layed on disk when the segment gets `finalized`.
pub struct SegmentWriter {
pub(crate) max_doc: DocId,
pub(crate) indexing_context: IndexingContext,
pub(crate) ctx: IndexingContext,
pub(crate) per_field_postings_writers: PerFieldPostingsWriter,
pub(crate) segment_serializer: SegmentSerializer,
pub(crate) fast_field_writers: FastFieldsWriter,
Expand Down Expand Up @@ -101,7 +101,7 @@ impl SegmentWriter {
.collect();
Ok(SegmentWriter {
max_doc: 0,
indexing_context: IndexingContext::new(table_size),
ctx: IndexingContext::new(table_size),
per_field_postings_writers,
fieldnorms_writer: FieldNormsWriter::for_schema(&schema),
segment_serializer,
Expand Down Expand Up @@ -130,7 +130,7 @@ impl SegmentWriter {
.transpose()?;
remap_and_write(
&self.per_field_postings_writers,
self.indexing_context,
self.ctx,
&self.fast_field_writers,
&self.fieldnorms_writer,
&self.schema,
Expand All @@ -142,7 +142,7 @@ impl SegmentWriter {
}

pub fn mem_usage(&self) -> usize {
self.indexing_context.mem_usage()
self.ctx.mem_usage()
+ self.fieldnorms_writer.mem_usage()
+ self.fast_field_writers.mem_usage()
+ self.segment_serializer.mem_usage()
Expand All @@ -162,8 +162,7 @@ impl SegmentWriter {
if !field_entry.is_indexed() {
continue;
}
let (term_buffer, indexing_context) =
(&mut self.term_buffer, &mut self.indexing_context);
let (term_buffer, ctx) = (&mut self.term_buffer, &mut self.ctx);
let postings_writer: &mut dyn PostingsWriter =
self.per_field_postings_writers.get_for_field_mut(field);
match *field_entry.field_type() {
Expand All @@ -177,12 +176,8 @@ impl SegmentWriter {
.token_stream(facet_str)
.process(&mut |token| {
term_buffer.set_text(&token.text);
let unordered_term_id = postings_writer.subscribe(
doc_id,
0u32,
term_buffer,
indexing_context,
);
let unordered_term_id =
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
// TODO pass indexing context directly in subscribe function
unordered_term_id_opt = Some(unordered_term_id);
});
Expand Down Expand Up @@ -229,7 +224,7 @@ impl SegmentWriter {
field,
&mut *token_stream,
term_buffer,
indexing_context,
ctx,
&mut indexing_position,
);
}
Expand All @@ -241,39 +236,39 @@ impl SegmentWriter {
term_buffer.set_field(Type::U64, field);
let u64_val = value.as_u64().ok_or_else(make_schema_error)?;
term_buffer.set_u64(u64_val);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::Date(_) => {
for value in values {
term_buffer.set_field(Type::Date, field);
let date_val = value.as_date().ok_or_else(make_schema_error)?;
term_buffer.set_i64(date_val.timestamp());
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::I64(_) => {
for value in values {
term_buffer.set_field(Type::I64, field);
let i64_val = value.as_i64().ok_or_else(make_schema_error)?;
term_buffer.set_i64(i64_val);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::F64(_) => {
for value in values {
term_buffer.set_field(Type::F64, field);
let f64_val = value.as_f64().ok_or_else(make_schema_error)?;
term_buffer.set_f64(f64_val);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
FieldType::Bytes(_) => {
for value in values {
term_buffer.set_field(Type::Bytes, field);
let bytes = value.as_bytes().ok_or_else(make_schema_error)?;
term_buffer.set_bytes(bytes);
postings_writer.subscribe(doc_id, 0u32, term_buffer, indexing_context);
postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx);
}
}
}
Expand Down Expand Up @@ -324,7 +319,7 @@ impl SegmentWriter {
/// `doc_id_map` is used to map to the new doc_id order.
fn remap_and_write(
per_field_postings_writers: &PerFieldPostingsWriter,
indexing_context: IndexingContext,
ctx: IndexingContext,
fast_field_writers: &FastFieldsWriter,
fieldnorms_writer: &FieldNormsWriter,
schema: &Schema,
Expand All @@ -339,7 +334,7 @@ fn remap_and_write(
.open_read(SegmentComponent::FieldNorms)?;
let fieldnorm_readers = FieldNormReaders::open(fieldnorm_data)?;
let term_ord_map = serialize_postings(
indexing_context,
ctx,
per_field_postings_writers,
fieldnorm_readers,
doc_id_map,
Expand Down
38 changes: 15 additions & 23 deletions src/postings/postings_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,16 @@ fn make_field_partition(
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub(crate) fn serialize_postings(
indexing_context: IndexingContext,
ctx: IndexingContext,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
schema: &Schema,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(indexing_context.term_index.len());
term_offsets.extend(indexing_context.term_index.iter());
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());

let mut unordered_term_mappings: HashMap<Field, FnvHashMap<UnorderedTermId, TermOrdinal>> =
Expand Down Expand Up @@ -94,7 +94,7 @@ pub(crate) fn serialize_postings(
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&indexing_context,
&ctx,
&mut field_serializer,
)?;
field_serializer.close()?;
Expand All @@ -118,14 +118,14 @@ pub(crate) trait PostingsWriter {
/// * doc - the document id
/// * pos - the term position (expressed in tokens)
/// * term - the term
/// * indexing_context - Contains a term hashmap and a memory arena to store all necessary
/// posting list information.
/// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list
/// information.
fn subscribe(
&mut self,
doc: DocId,
pos: u32,
term: &Term,
indexing_context: &mut IndexingContext,
ctx: &mut IndexingContext,
) -> UnorderedTermId;

/// Serializes the postings on disk.
Expand All @@ -134,7 +134,7 @@ pub(crate) trait PostingsWriter {
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
indexing_context: &IndexingContext,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()>;

Expand All @@ -145,7 +145,7 @@ pub(crate) trait PostingsWriter {
field: Field,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
indexing_context: &mut IndexingContext,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
) {
term_buffer.set_field(Type::Str, field);
Expand All @@ -165,7 +165,7 @@ pub(crate) trait PostingsWriter {
term_buffer.set_text(token.text.as_str());
let start_position = indexing_position.end_position + token.position as u32;
end_position = start_position + token.position_length as u32;
self.subscribe(doc_id, start_position, term_buffer, indexing_context);
self.subscribe(doc_id, start_position, term_buffer, ctx);
num_tokens += 1;
});
indexing_position.end_position = end_position + POSITION_GAP;
Expand Down Expand Up @@ -203,14 +203,11 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
doc: DocId,
position: u32,
term: &Term,
indexing_context: &mut IndexingContext,
ctx: &mut IndexingContext,
) -> UnorderedTermId {
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
let (term_index, arena) = (
&mut indexing_context.term_index,
&mut indexing_context.arena,
);
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
Expand All @@ -233,20 +230,15 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
indexing_context: &IndexingContext,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
let recorder: Rec = indexing_context.term_index.read(*addr);
let recorder: Rec = ctx.term_index.read(*addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(
&indexing_context.arena,
doc_id_map,
serializer,
&mut buffer_lender,
);
recorder.serialize(&ctx.arena, doc_id_map, serializer, &mut buffer_lender);
serializer.close_term()?;
}
Ok(())
Expand Down

0 comments on commit d37633e

Please sign in to comment.