Skip to content

Commit

Permalink
remove searcher pool and make Searcher cloneable (#1411)
Browse files Browse the repository at this point in the history
* remove searcher pool and make Searcher cloneable

closes #1410

* use SearcherInner in InnerIndexReader
  • Loading branch information
PSeitz committed Jul 12, 2022
1 parent a4be239 commit 23fe73a
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 410 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Tantivy 0.19
- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396)
The `DateTime` type has been updated to hold timestamps with microseconds precision.
`DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing).

- Remove Searcher pool and make `Searcher` cloneable.

Tantivy 0.18
================================
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ measure_time = "0.8.2"
pretty_assertions = "1.2.1"
serde_cbor = { version = "0.11.2", optional = true }
async-trait = "0.1.53"
arc-swap = "1.5.0"

[target.'cfg(windows)'.dependencies]
winapi = "0.3.9"
Expand Down
6 changes: 1 addition & 5 deletions examples/warmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,7 @@ fn main() -> tantivy::Result<()> {
let warmers: Vec<Weak<dyn Warmer>> = vec![Arc::downgrade(
&(price_dynamic_column.clone() as Arc<dyn Warmer>),
)];
let reader: IndexReader = index
.reader_builder()
.warmers(warmers)
.num_searchers(1)
.try_into()?;
let reader: IndexReader = index.reader_builder().warmers(warmers).try_into()?;
reader.reload()?;

let query_parser = QueryParser::for_index(&index, vec![text]);
Expand Down
2 changes: 1 addition & 1 deletion src/collector/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ impl SegmentCollector for BytesFastFieldSegmentCollector {
}
}

fn make_test_searcher() -> crate::Result<crate::LeasedItem<Searcher>> {
fn make_test_searcher() -> crate::Result<Searcher> {
let schema = Schema::builder().build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_for_tests()?;
Expand Down
98 changes: 59 additions & 39 deletions src/core/searcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::{fmt, io};

use crate::collector::Collector;
Expand Down Expand Up @@ -62,53 +63,28 @@ impl SearcherGeneration {
///
/// It guarantees that the `Segment` will not be removed before
/// the destruction of the `Searcher`.
#[derive(Clone)]
pub struct Searcher {
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
store_readers: Vec<StoreReader>,
generation: TrackedObject<SearcherGeneration>,
inner: Arc<SearcherInner>,
}

impl Searcher {
/// Creates a new `Searcher`
pub(crate) fn new(
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<Searcher> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?;

Ok(Searcher {
schema,
index,
segment_readers,
store_readers,
generation,
})
}

/// Returns the `Index` associated to the `Searcher`
pub fn index(&self) -> &Index {
&self.index
&self.inner.index
}

/// [SearcherGeneration] which identifies the version of the snapshot held by this `Searcher`.
pub fn generation(&self) -> &SearcherGeneration {
self.generation.as_ref()
self.inner.generation.as_ref()
}

/// Fetches a document from tantivy's store given a `DocAddress`.
///
/// The searcher uses the segment ordinal to route the
/// the request to the right `Segment`.
pub fn doc(&self, doc_address: DocAddress) -> crate::Result<Document> {
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get(doc_address.doc_id)
}

Expand All @@ -117,6 +93,7 @@ impl Searcher {
/// Aggregates the sum for each segment store reader.
pub fn doc_store_cache_stats(&self) -> CacheStats {
let cache_stats: CacheStats = self
.inner
.store_readers
.iter()
.map(|reader| reader.cache_stats())
Expand All @@ -127,18 +104,19 @@ impl Searcher {
/// Fetches a document in an asynchronous manner.
#[cfg(feature = "quickwit")]
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
let store_reader = &self.store_readers[doc_address.segment_ord as usize];
let store_reader = &self.inner.store_readers[doc_address.segment_ord as usize];
store_reader.get_async(doc_address.doc_id).await
}

/// Access the schema associated to the index of this searcher.
pub fn schema(&self) -> &Schema {
&self.schema
&self.inner.schema
}

/// Returns the overall number of documents in the index.
pub fn num_docs(&self) -> u64 {
self.segment_readers
self.inner
.segment_readers
.iter()
.map(|segment_reader| u64::from(segment_reader.num_docs()))
.sum::<u64>()
Expand All @@ -148,7 +126,7 @@ impl Searcher {
/// the given term.
pub fn doc_freq(&self, term: &Term) -> crate::Result<u64> {
let mut total_doc_freq = 0;
for segment_reader in &self.segment_readers {
for segment_reader in &self.inner.segment_readers {
let inverted_index = segment_reader.inverted_index(term.field())?;
let doc_freq = inverted_index.doc_freq(term)?;
total_doc_freq += u64::from(doc_freq);
Expand All @@ -158,12 +136,12 @@ impl Searcher {

/// Return the list of segment readers
pub fn segment_readers(&self) -> &[SegmentReader] {
&self.segment_readers
&self.inner.segment_readers
}

/// Returns the segment_reader associated with the given segment_ord
pub fn segment_reader(&self, segment_ord: u32) -> &SegmentReader {
&self.segment_readers[segment_ord as usize]
&self.inner.segment_readers[segment_ord as usize]
}

/// Runs a query on the segment readers wrapped by the searcher.
Expand All @@ -185,7 +163,7 @@ impl Searcher {
query: &dyn Query,
collector: &C,
) -> crate::Result<C::Fruit> {
let executor = self.index.search_executor();
let executor = self.inner.index.search_executor();
self.search_with_executor(query, collector, executor)
}

Expand Down Expand Up @@ -222,17 +200,59 @@ impl Searcher {
/// Summarize total space usage of this searcher.
pub fn space_usage(&self) -> io::Result<SearcherSpaceUsage> {
let mut space_usage = SearcherSpaceUsage::new();
for segment_reader in &self.segment_readers {
for segment_reader in self.segment_readers() {
space_usage.add_segment(segment_reader.space_usage()?);
}
Ok(space_usage)
}
}

impl From<Arc<SearcherInner>> for Searcher {
fn from(inner: Arc<SearcherInner>) -> Self {
Searcher { inner }
}
}

/// Holds a list of `SegmentReader`s ready for search.
///
/// It guarantees that the `Segment` will not be removed before
/// the destruction of the `Searcher`.
pub(crate) struct SearcherInner {
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
store_readers: Vec<StoreReader>,
generation: TrackedObject<SearcherGeneration>,
}

impl SearcherInner {
/// Creates a new `Searcher`
pub(crate) fn new(
schema: Schema,
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<SearcherInner> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?;

Ok(SearcherInner {
schema,
index,
segment_readers,
store_readers,
generation,
})
}
}

impl fmt::Debug for Searcher {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let segment_ids = self
.segment_readers
.segment_readers()
.iter()
.map(SegmentReader::segment_id)
.collect::<Vec<_>>();
Expand Down
10 changes: 3 additions & 7 deletions src/fastfield/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ pub use self::writer::BytesFastFieldWriter;

#[cfg(test)]
mod tests {
use std::ops::Deref;

use crate::query::TermQuery;
use crate::schema::{BytesOptions, IndexRecordOption, Schema, Value, FAST, INDEXED, STORED};
use crate::{DocAddress, DocSet, Index, Searcher, Term};
Expand Down Expand Up @@ -37,9 +35,7 @@ mod tests {
Ok(())
}

fn create_index_for_test<T: Into<BytesOptions>>(
byte_options: T,
) -> crate::Result<impl Deref<Target = Searcher>> {
fn create_index_for_test<T: Into<BytesOptions>>(byte_options: T) -> crate::Result<Searcher> {
let mut schema_builder = Schema::builder();
let field = schema_builder.add_bytes_field("string_bytes", byte_options.into());
let schema = schema_builder.build();
Expand Down Expand Up @@ -86,7 +82,7 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight = term_query.specialized_weight(&*searcher, true)?;
let term_weight = term_query.specialized_weight(&searcher, true)?;
let term_scorer = term_weight.specialized_scorer(searcher.segment_reader(0), 1.0)?;
assert_eq!(term_scorer.doc(), 0u32);
Ok(())
Expand All @@ -99,7 +95,7 @@ mod tests {
let field = searcher.schema().get_field("string_bytes").unwrap();
let term = Term::from_field_bytes(field, b"lucene".as_ref());
let term_query = TermQuery::new(term, IndexRecordOption::Basic);
let term_weight_err = term_query.specialized_weight(&*searcher, false);
let term_weight_err = term_query.specialized_weight(&searcher, false);
assert!(matches!(
term_weight_err,
Err(crate::TantivyError::SchemaError(_))
Expand Down
4 changes: 2 additions & 2 deletions src/fieldnorm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ mod tests {
Term::from_field_text(text, "hello"),
IndexRecordOption::WithFreqs,
);
let weight = query.weight(&*searcher, true)?;
let weight = query.weight(&searcher, true)?;
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(scorer.doc(), 0);
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
Expand Down Expand Up @@ -141,7 +141,7 @@ mod tests {
Term::from_field_text(text, "hello"),
IndexRecordOption::WithFreqs,
);
let weight = query.weight(&*searcher, true)?;
let weight = query.weight(&searcher, true)?;
let mut scorer = weight.scorer(searcher.segment_reader(0), 1.0f32)?;
assert_eq!(scorer.doc(), 0);
assert!((scorer.score() - 0.22920431).abs() < 0.001f32);
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ pub use crate::indexer::demuxer::*;
pub use crate::indexer::operation::UserOperation;
pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit};
pub use crate::postings::Postings;
pub use crate::reader::LeasedItem;
pub use crate::schema::{DateOptions, DatePrecision, Document, Term};

/// Index format version.
Expand Down
2 changes: 1 addition & 1 deletion src/query/phrase_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ pub mod tests {
let matching_docs = |query: &str| {
let query_parser = QueryParser::for_index(&index, vec![json_field]);
let phrase_query = query_parser.parse_query(query).unwrap();
let phrase_weight = phrase_query.weight(&*searcher, false).unwrap();
let phrase_weight = phrase_query.weight(&searcher, false).unwrap();
let mut phrase_scorer = phrase_weight
.scorer(searcher.segment_reader(0), 1.0f32)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/query/term_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ mod tests {
let term_a = Term::from_field_text(text_field, "a");
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
let reader = index.reader()?;
assert_eq!(term_query.count(&*reader.searcher())?, 1);
assert_eq!(term_query.count(&reader.searcher())?, 1);
Ok(())
}

Expand Down

0 comments on commit 23fe73a

Please sign in to comment.