From b5402ea91204f5a67b64f53c9e100a82916277f0 Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Mon, 4 Jul 2022 13:54:22 +0800 Subject: [PATCH] set docstore cache size at construction --- src/core/searcher.rs | 6 +----- src/core/segment_reader.rs | 6 +++--- src/functional_test.rs | 2 +- src/indexer/index_writer.rs | 5 ++++- src/indexer/merger.rs | 9 ++------- src/indexer/segment_writer.rs | 1 + src/reader/mod.rs | 4 ++-- src/store/mod.rs | 10 +++++----- src/store/reader.rs | 17 ++++------------- 9 files changed, 23 insertions(+), 37 deletions(-) diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 35cbd3715d..8f9bca8104 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -81,13 +81,9 @@ impl Searcher { ) -> io::Result { let store_readers: Vec = segment_readers .iter() - .map(SegmentReader::get_store_reader) + .map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size)) .collect::>>()?; - for store_reader in &store_readers { - store_reader.set_cache_size(doc_store_cache_size); - } - Ok(Searcher { schema, index, diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index dab64d8abd..26e0c41ecd 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -133,8 +133,8 @@ impl SegmentReader { } /// Accessor to the segment's `StoreReader`. - pub fn get_store_reader(&self) -> io::Result { - StoreReader::open(self.store_file.clone()) + pub fn get_store_reader(&self, cache_size: usize) -> io::Result { + StoreReader::open(self.store_file.clone(), cache_size) } /// Open a new segment for reading. @@ -326,7 +326,7 @@ impl SegmentReader { self.positions_composite.space_usage(), self.fast_fields_readers.space_usage(), self.fieldnorm_readers.space_usage(), - self.get_store_reader()?.space_usage(), + self.get_store_reader(0)?.space_usage(), self.alive_bitset_opt .as_ref() .map(AliveBitSet::space_usage) diff --git a/src/functional_test.rs b/src/functional_test.rs index e6be8bcc50..e0d0c8bfee 100644 --- a/src/functional_test.rs +++ b/src/functional_test.rs @@ -9,7 +9,7 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> { assert!(searcher.segment_readers().len() < 20); assert_eq!(searcher.num_docs() as usize, vals.len()); for segment_reader in searcher.segment_readers() { - let store_reader = segment_reader.get_store_reader()?; + let store_reader = segment_reader.get_store_reader(1)?; for doc_id in 0..segment_reader.max_doc() { let _doc = store_reader.get(doc_id)?; } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 9d5a329893..8718c5370b 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -792,6 +792,7 @@ mod tests { self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, }; + use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order, ReloadPolicy, Term}; const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \ @@ -1550,7 +1551,9 @@ mod tests { // doc store tests for segment_reader in searcher.segment_readers().iter() { - let store_reader = segment_reader.get_store_reader().unwrap(); + let store_reader = segment_reader + .get_store_reader(DOCSTORE_CACHE_CAPACITY) + .unwrap(); // test store iterator for doc in store_reader.iter(segment_reader.alive_bitset()) { let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 3195815a9d..340ca9127b 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1036,13 +1036,9 @@ impl IndexMerger { let store_readers: Vec<_> = self .readers .iter() - .map(|reader| reader.get_store_reader()) + .map(|reader| reader.get_store_reader(50)) .collect::>()?; - for store_reader in &store_readers { - store_reader.set_cache_size(50); - } - let mut document_iterators: Vec<_> = store_readers .iter() .enumerate() @@ -1065,8 +1061,7 @@ impl IndexMerger { } else { debug!("trivial-doc-id-mapping"); for reader in &self.readers { - let store_reader = reader.get_store_reader()?; - store_reader.set_cache_size(1); + let store_reader = reader.get_store_reader(1)?; if reader.has_deletes() // If there is not enough data in the store, we avoid stacking in order to // avoid creating many small blocks in the doc store. Once we have 5 full blocks, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 308cca255e..727eea9537 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -389,6 +389,7 @@ fn remap_and_write( serializer .segment() .open_read(SegmentComponent::TempStore)?, + 50, )?; for old_doc_id in doc_id_map.iter_old_doc_ids() { let doc_bytes = store_read.get_document_bytes(old_doc_id)?; diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 7e7d4bbdf9..ca00a9383c 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -13,7 +13,7 @@ use self::pool::Pool; use self::warming::WarmingState; use crate::core::searcher::SearcherGeneration; use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK}; -use crate::store::LRU_CACHE_CAPACITY; +use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject}; /// Defines when a new version of the index should be reloaded. @@ -60,7 +60,7 @@ impl IndexReaderBuilder { index, warmers: Vec::new(), num_warming_threads: 1, - doc_store_cache_size: LRU_CACHE_CAPACITY, + doc_store_cache_size: DOCSTORE_CACHE_CAPACITY, } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 5e9edf10ae..51b4dccad0 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -40,7 +40,7 @@ mod reader; mod writer; pub use self::compressors::{Compressor, ZstdCompressor}; pub use self::decompressors::Decompressor; -pub(crate) use self::reader::LRU_CACHE_CAPACITY; +pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; pub use self::reader::{CacheStats, StoreReader}; pub use self::writer::StoreWriter; @@ -115,7 +115,7 @@ pub mod tests { let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, 10)?; for i in 0..NUM_DOCS as u32 { assert_eq!( *store @@ -155,7 +155,7 @@ pub mod tests { let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, 10)?; for i in 0..NUM_DOCS as u32 { assert_eq!( *store @@ -232,7 +232,7 @@ pub mod tests { let searcher = index.reader()?.searcher(); let reader = searcher.segment_reader(0); - let store = reader.get_store_reader()?; + let store = reader.get_store_reader(10)?; for doc in store.iter(reader.alive_bitset()) { assert_eq!( *doc?.get_first(text_field).unwrap().as_text().unwrap(), @@ -336,7 +336,7 @@ pub mod tests { let searcher = index.reader()?.searcher(); assert_eq!(searcher.segment_readers().len(), 1); let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader()?; + let store = reader.get_store_reader(10)?; assert_eq!(store.block_checkpoints().count(), 1); Ok(()) } diff --git a/src/store/reader.rs b/src/store/reader.rs index e2b31b2b83..62afd4c04a 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -19,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage; use crate::store::index::Checkpoint; use crate::DocId; -pub(crate) const LRU_CACHE_CAPACITY: usize = 100; +pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100; type Block = OwnedBytes; @@ -64,10 +64,6 @@ impl BlockCache { self.cache.lock().unwrap().len() } - fn set_size(&self, size: usize) { - self.cache.lock().unwrap().resize(size); - } - #[cfg(test)] fn peek_lru(&self) -> Option { self.cache @@ -111,7 +107,7 @@ impl Sum for CacheStats { impl StoreReader { /// Opens a store reader - pub fn open(store_file: FileSlice) -> io::Result { + pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result { let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); @@ -122,7 +118,7 @@ impl StoreReader { decompressor: footer.decompressor, data: data_file, cache: BlockCache { - cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)), + cache: Mutex::new(LruCache::new(cache_size)), cache_hits: Default::default(), cache_misses: Default::default(), }, @@ -144,11 +140,6 @@ impl StoreReader { self.cache.stats() } - /// Set lru cache size for decompressed blocks. Defaults to 100 (LRU_CACHE_CAPACITY). - pub(crate) fn set_cache_size(&self, size: usize) { - self.cache.set_size(size) - } - /// Get checkpoint for DocId. The checkpoint can be used to load a block containing the /// document. /// @@ -405,7 +396,7 @@ mod tests { let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?; assert_eq!(store.cache.len(), 0); assert_eq!(store.cache_stats().cache_hits, 0);