diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 1b8f1257e5..8f9bca8104 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -6,7 +6,7 @@ use crate::core::{Executor, SegmentReader}; use crate::query::Query; use crate::schema::{Document, Schema, Term}; use crate::space_usage::SearcherSpaceUsage; -use crate::store::StoreReader; +use crate::store::{CacheStats, StoreReader}; use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject}; /// Identifies the searcher generation accessed by a [Searcher]. @@ -77,11 +77,13 @@ impl Searcher { index: Index, segment_readers: Vec, generation: TrackedObject, + doc_store_cache_size: usize, ) -> io::Result { let store_readers: Vec = segment_readers .iter() - .map(SegmentReader::get_store_reader) + .map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size)) .collect::>>()?; + Ok(Searcher { schema, index, @@ -110,6 +112,18 @@ impl Searcher { store_reader.get(doc_address.doc_id) } + /// The cache stats for the underlying store reader. + /// + /// Aggregates the sum for each segment store reader. + pub fn doc_store_cache_stats(&self) -> CacheStats { + let cache_stats: CacheStats = self + .store_readers + .iter() + .map(|reader| reader.cache_stats()) + .sum(); + cache_stats + } + /// Fetches a document in an asynchronous manner. #[cfg(feature = "quickwit")] pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result { diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index dab64d8abd..26e0c41ecd 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -133,8 +133,8 @@ impl SegmentReader { } /// Accessor to the segment's `StoreReader`. - pub fn get_store_reader(&self) -> io::Result { - StoreReader::open(self.store_file.clone()) + pub fn get_store_reader(&self, cache_size: usize) -> io::Result { + StoreReader::open(self.store_file.clone(), cache_size) } /// Open a new segment for reading. @@ -326,7 +326,7 @@ impl SegmentReader { self.positions_composite.space_usage(), self.fast_fields_readers.space_usage(), self.fieldnorm_readers.space_usage(), - self.get_store_reader()?.space_usage(), + self.get_store_reader(0)?.space_usage(), self.alive_bitset_opt .as_ref() .map(AliveBitSet::space_usage) diff --git a/src/functional_test.rs b/src/functional_test.rs index e6be8bcc50..e0d0c8bfee 100644 --- a/src/functional_test.rs +++ b/src/functional_test.rs @@ -9,7 +9,7 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> { assert!(searcher.segment_readers().len() < 20); assert_eq!(searcher.num_docs() as usize, vals.len()); for segment_reader in searcher.segment_readers() { - let store_reader = segment_reader.get_store_reader()?; + let store_reader = segment_reader.get_store_reader(1)?; for doc_id in 0..segment_reader.max_doc() { let _doc = store_reader.get(doc_id)?; } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 9d5a329893..8718c5370b 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -792,6 +792,7 @@ mod tests { self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, }; + use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order, ReloadPolicy, Term}; const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \ @@ -1550,7 +1551,9 @@ mod tests { // doc store tests for segment_reader in searcher.segment_readers().iter() { - let store_reader = segment_reader.get_store_reader().unwrap(); + let store_reader = segment_reader + .get_store_reader(DOCSTORE_CACHE_CAPACITY) + .unwrap(); // test store iterator for doc in store_reader.iter(segment_reader.alive_bitset()) { let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 3c4f2a2abc..340ca9127b 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1030,18 +1030,21 @@ impl IndexMerger { debug_time!("write-storable-fields"); debug!("write-storable-field"); - let store_readers: Vec<_> = self - .readers - .iter() - .map(|reader| reader.get_store_reader()) - .collect::>()?; - let mut document_iterators: Vec<_> = store_readers - .iter() - .enumerate() - .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) - .collect(); if !doc_id_mapping.is_trivial() { debug!("non-trivial-doc-id-mapping"); + + let store_readers: Vec<_> = self + .readers + .iter() + .map(|reader| reader.get_store_reader(50)) + .collect::>()?; + + let mut document_iterators: Vec<_> = store_readers + .iter() + .enumerate() + .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) + .collect(); + for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() { let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize]; if let Some(doc_bytes_res) = doc_bytes_it.next() { @@ -1058,7 +1061,7 @@ impl IndexMerger { } else { debug!("trivial-doc-id-mapping"); for reader in &self.readers { - let store_reader = reader.get_store_reader()?; + let store_reader = reader.get_store_reader(1)?; if reader.has_deletes() // If there is not enough data in the store, we avoid stacking in order to // avoid creating many small blocks in the doc store. Once we have 5 full blocks, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 308cca255e..727eea9537 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -389,6 +389,7 @@ fn remap_and_write( serializer .segment() .open_read(SegmentComponent::TempStore)?, + 50, )?; for old_doc_id in doc_id_map.iter_old_doc_ids() { let doc_bytes = store_read.get_document_bytes(old_doc_id)?; diff --git a/src/reader/mod.rs b/src/reader/mod.rs index a61172d3bf..ca00a9383c 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -13,6 +13,7 @@ use self::pool::Pool; use self::warming::WarmingState; use crate::core::searcher::SearcherGeneration; use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK}; +use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject}; /// Defines when a new version of the index should be reloaded. @@ -47,6 +48,7 @@ pub struct IndexReaderBuilder { index: Index, warmers: Vec>, num_warming_threads: usize, + doc_store_cache_size: usize, } impl IndexReaderBuilder { @@ -58,6 +60,7 @@ impl IndexReaderBuilder { index, warmers: Vec::new(), num_warming_threads: 1, + doc_store_cache_size: DOCSTORE_CACHE_CAPACITY, } } @@ -76,6 +79,7 @@ impl IndexReaderBuilder { let inner_reader = InnerIndexReader { index: self.index, num_searchers: self.num_searchers, + doc_store_cache_size: self.doc_store_cache_size, searcher_pool: Pool::new(), warming_state, searcher_generation_counter: Default::default(), @@ -157,6 +161,7 @@ impl TryInto for IndexReaderBuilder { struct InnerIndexReader { num_searchers: usize, + doc_store_cache_size: usize, index: Index, warming_state: WarmingState, searcher_pool: Pool, @@ -203,6 +208,7 @@ impl InnerIndexReader { self.index.clone(), segment_readers.clone(), searcher_generation.clone(), + self.doc_store_cache_size, ) }) .take(self.num_searchers) diff --git a/src/store/mod.rs b/src/store/mod.rs index 8dd035fe7f..bf57bbc1d0 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -40,7 +40,8 @@ mod reader; mod writer; pub use self::compressors::{Compressor, ZstdCompressor}; pub use self::decompressors::Decompressor; -pub use self::reader::StoreReader; +pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; +pub use self::reader::{CacheStats, StoreReader}; pub use self::writer::StoreWriter; #[cfg(feature = "lz4-compression")] @@ -114,7 +115,7 @@ pub mod tests { let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, 10)?; for i in 0..NUM_DOCS as u32 { assert_eq!( *store @@ -154,7 +155,7 @@ pub mod tests { let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, 10)?; for i in 0..NUM_DOCS as u32 { assert_eq!( *store @@ -231,7 +232,7 @@ pub mod tests { let searcher = index.reader()?.searcher(); let reader = searcher.segment_reader(0); - let store = reader.get_store_reader()?; + let store = reader.get_store_reader(10)?; for doc in store.iter(reader.alive_bitset()) { assert_eq!( *doc?.get_first(text_field).unwrap().as_text().unwrap(), @@ -267,7 +268,7 @@ pub mod tests { } assert_eq!( index.reader().unwrap().searcher().segment_readers()[0] - .get_store_reader() + .get_store_reader(10) .unwrap() .decompressor(), Decompressor::Lz4 @@ -288,7 +289,7 @@ pub mod tests { let searcher = index.reader().unwrap().searcher(); assert_eq!(searcher.segment_readers().len(), 1); let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader().unwrap(); + let store = reader.get_store_reader(10).unwrap(); for doc in store.iter(reader.alive_bitset()).take(50) { assert_eq!( @@ -335,7 +336,7 @@ pub mod tests { let searcher = index.reader()?.searcher(); assert_eq!(searcher.segment_readers().len(), 1); let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader()?; + let store = reader.get_store_reader(10)?; assert_eq!(store.block_checkpoints().count(), 1); Ok(()) } @@ -379,7 +380,7 @@ mod bench { 16_384, ); let store_file = directory.open_read(path).unwrap(); - let store = StoreReader::open(store_file).unwrap(); + let store = StoreReader::open(store_file, 10).unwrap(); b.iter(|| store.iter(None).collect::>()); } } diff --git a/src/store/reader.rs b/src/store/reader.rs index 6d30b7613c..62afd4c04a 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,4 +1,6 @@ use std::io; +use std::iter::Sum; +use std::ops::AddAssign; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -17,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage; use crate::store::index::Checkpoint; use crate::DocId; -const LRU_CACHE_CAPACITY: usize = 100; +pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100; type Block = OwnedBytes; @@ -30,18 +32,13 @@ pub struct StoreReader { cache: BlockCache, } +/// The cache for decompressed blocks. struct BlockCache { cache: Mutex>, cache_hits: Arc, cache_misses: Arc, } -pub struct CacheStats { - pub num_entries: usize, - pub cache_hits: usize, - pub cache_misses: usize, -} - impl BlockCache { fn get_from_cache(&self, pos: usize) -> Option { if let Some(block) = self.cache.lock().unwrap().get(&pos) { @@ -77,9 +74,40 @@ impl BlockCache { } } +#[derive(Debug, Default)] +/// CacheStats for the `StoreReader`. +pub struct CacheStats { + /// The number of entries in the cache + pub num_entries: usize, + /// The number of cache hits. + pub cache_hits: usize, + /// The number of cache misses. + pub cache_misses: usize, +} + +impl AddAssign for CacheStats { + fn add_assign(&mut self, other: Self) { + *self = Self { + num_entries: self.num_entries + other.num_entries, + cache_hits: self.cache_hits + other.cache_hits, + cache_misses: self.cache_misses + other.cache_misses, + }; + } +} + +impl Sum for CacheStats { + fn sum>(mut iter: I) -> Self { + let mut first = iter.next().unwrap_or_default(); + for el in iter { + first += el; + } + first + } +} + impl StoreReader { /// Opens a store reader - pub fn open(store_file: FileSlice) -> io::Result { + pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result { let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); @@ -90,7 +118,7 @@ impl StoreReader { decompressor: footer.decompressor, data: data_file, cache: BlockCache { - cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)), + cache: Mutex::new(LruCache::new(cache_size)), cache_hits: Default::default(), cache_misses: Default::default(), }, @@ -368,7 +396,7 @@ mod tests { let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?; assert_eq!(store.cache.len(), 0); assert_eq!(store.cache_stats().cache_hits, 0);