diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 1b8f1257e5..35cbd3715d 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -6,7 +6,7 @@ use crate::core::{Executor, SegmentReader}; use crate::query::Query; use crate::schema::{Document, Schema, Term}; use crate::space_usage::SearcherSpaceUsage; -use crate::store::StoreReader; +use crate::store::{CacheStats, StoreReader}; use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject}; /// Identifies the searcher generation accessed by a [Searcher]. @@ -77,11 +77,17 @@ impl Searcher { index: Index, segment_readers: Vec, generation: TrackedObject, + doc_store_cache_size: usize, ) -> io::Result { let store_readers: Vec = segment_readers .iter() .map(SegmentReader::get_store_reader) .collect::>>()?; + + for store_reader in &store_readers { + store_reader.set_cache_size(doc_store_cache_size); + } + Ok(Searcher { schema, index, @@ -110,6 +116,18 @@ impl Searcher { store_reader.get(doc_address.doc_id) } + /// The cache stats for the underlying store reader. + /// + /// Aggregates the sum for each segment store reader. + pub fn doc_store_cache_stats(&self) -> CacheStats { + let cache_stats: CacheStats = self + .store_readers + .iter() + .map(|reader| reader.cache_stats()) + .sum(); + cache_stats + } + /// Fetches a document in an asynchronous manner. #[cfg(feature = "quickwit")] pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result { diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 3c4f2a2abc..3195815a9d 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1030,18 +1030,25 @@ impl IndexMerger { debug_time!("write-storable-fields"); debug!("write-storable-field"); - let store_readers: Vec<_> = self - .readers - .iter() - .map(|reader| reader.get_store_reader()) - .collect::>()?; - let mut document_iterators: Vec<_> = store_readers - .iter() - .enumerate() - .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) - .collect(); if !doc_id_mapping.is_trivial() { debug!("non-trivial-doc-id-mapping"); + + let store_readers: Vec<_> = self + .readers + .iter() + .map(|reader| reader.get_store_reader()) + .collect::>()?; + + for store_reader in &store_readers { + store_reader.set_cache_size(50); + } + + let mut document_iterators: Vec<_> = store_readers + .iter() + .enumerate() + .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) + .collect(); + for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() { let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize]; if let Some(doc_bytes_res) = doc_bytes_it.next() { @@ -1059,6 +1066,7 @@ impl IndexMerger { debug!("trivial-doc-id-mapping"); for reader in &self.readers { let store_reader = reader.get_store_reader()?; + store_reader.set_cache_size(1); if reader.has_deletes() // If there is not enough data in the store, we avoid stacking in order to // avoid creating many small blocks in the doc store. Once we have 5 full blocks, diff --git a/src/reader/mod.rs b/src/reader/mod.rs index a61172d3bf..7e7d4bbdf9 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -13,6 +13,7 @@ use self::pool::Pool; use self::warming::WarmingState; use crate::core::searcher::SearcherGeneration; use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK}; +use crate::store::LRU_CACHE_CAPACITY; use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject}; /// Defines when a new version of the index should be reloaded. @@ -47,6 +48,7 @@ pub struct IndexReaderBuilder { index: Index, warmers: Vec>, num_warming_threads: usize, + doc_store_cache_size: usize, } impl IndexReaderBuilder { @@ -58,6 +60,7 @@ impl IndexReaderBuilder { index, warmers: Vec::new(), num_warming_threads: 1, + doc_store_cache_size: LRU_CACHE_CAPACITY, } } @@ -76,6 +79,7 @@ impl IndexReaderBuilder { let inner_reader = InnerIndexReader { index: self.index, num_searchers: self.num_searchers, + doc_store_cache_size: self.doc_store_cache_size, searcher_pool: Pool::new(), warming_state, searcher_generation_counter: Default::default(), @@ -157,6 +161,7 @@ impl TryInto for IndexReaderBuilder { struct InnerIndexReader { num_searchers: usize, + doc_store_cache_size: usize, index: Index, warming_state: WarmingState, searcher_pool: Pool, @@ -203,6 +208,7 @@ impl InnerIndexReader { self.index.clone(), segment_readers.clone(), searcher_generation.clone(), + self.doc_store_cache_size, ) }) .take(self.num_searchers) diff --git a/src/store/mod.rs b/src/store/mod.rs index 8dd035fe7f..5e9edf10ae 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -40,7 +40,8 @@ mod reader; mod writer; pub use self::compressors::{Compressor, ZstdCompressor}; pub use self::decompressors::Decompressor; -pub use self::reader::StoreReader; +pub(crate) use self::reader::LRU_CACHE_CAPACITY; +pub use self::reader::{CacheStats, StoreReader}; pub use self::writer::StoreWriter; #[cfg(feature = "lz4-compression")] diff --git a/src/store/reader.rs b/src/store/reader.rs index 6d30b7613c..e2b31b2b83 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,4 +1,6 @@ use std::io; +use std::iter::Sum; +use std::ops::AddAssign; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -17,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage; use crate::store::index::Checkpoint; use crate::DocId; -const LRU_CACHE_CAPACITY: usize = 100; +pub(crate) const LRU_CACHE_CAPACITY: usize = 100; type Block = OwnedBytes; @@ -30,18 +32,13 @@ pub struct StoreReader { cache: BlockCache, } +/// The cache for decompressed blocks. struct BlockCache { cache: Mutex>, cache_hits: Arc, cache_misses: Arc, } -pub struct CacheStats { - pub num_entries: usize, - pub cache_hits: usize, - pub cache_misses: usize, -} - impl BlockCache { fn get_from_cache(&self, pos: usize) -> Option { if let Some(block) = self.cache.lock().unwrap().get(&pos) { @@ -67,6 +64,10 @@ impl BlockCache { self.cache.lock().unwrap().len() } + fn set_size(&self, size: usize) { + self.cache.lock().unwrap().resize(size); + } + #[cfg(test)] fn peek_lru(&self) -> Option { self.cache @@ -77,6 +78,37 @@ impl BlockCache { } } +#[derive(Debug, Default)] +/// CacheStats for the `StoreReader`. +pub struct CacheStats { + /// The number of entries in the cache + pub num_entries: usize, + /// The number of cache hits. + pub cache_hits: usize, + /// The number of cache misses. + pub cache_misses: usize, +} + +impl AddAssign for CacheStats { + fn add_assign(&mut self, other: Self) { + *self = Self { + num_entries: self.num_entries + other.num_entries, + cache_hits: self.cache_hits + other.cache_hits, + cache_misses: self.cache_misses + other.cache_misses, + }; + } +} + +impl Sum for CacheStats { + fn sum>(mut iter: I) -> Self { + let mut first = iter.next().unwrap_or_default(); + for el in iter { + first += el; + } + first + } +} + impl StoreReader { /// Opens a store reader pub fn open(store_file: FileSlice) -> io::Result { @@ -112,6 +144,11 @@ impl StoreReader { self.cache.stats() } + /// Set lru cache size for decompressed blocks. Defaults to 100 (LRU_CACHE_CAPACITY). + pub(crate) fn set_cache_size(&self, size: usize) { + self.cache.set_size(size) + } + /// Get checkpoint for DocId. The checkpoint can be used to load a block containing the /// document. ///