Skip to content

Commit

Permalink
set docstore cache size at construction
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Jul 4, 2022
1 parent 9db2f0e commit b5402ea
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 37 deletions.
6 changes: 1 addition & 5 deletions src/core/searcher.rs
Expand Up @@ -81,13 +81,9 @@ impl Searcher {
) -> io::Result<Searcher> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(SegmentReader::get_store_reader)
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?;

for store_reader in &store_readers {
store_reader.set_cache_size(doc_store_cache_size);
}

Ok(Searcher {
schema,
index,
Expand Down
6 changes: 3 additions & 3 deletions src/core/segment_reader.rs
Expand Up @@ -133,8 +133,8 @@ impl SegmentReader {
}

/// Accessor to the segment's `StoreReader`.
pub fn get_store_reader(&self) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone())
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_size)
}

/// Open a new segment for reading.
Expand Down Expand Up @@ -326,7 +326,7 @@ impl SegmentReader {
self.positions_composite.space_usage(),
self.fast_fields_readers.space_usage(),
self.fieldnorm_readers.space_usage(),
self.get_store_reader()?.space_usage(),
self.get_store_reader(0)?.space_usage(),
self.alive_bitset_opt
.as_ref()
.map(AliveBitSet::space_usage)
Expand Down
2 changes: 1 addition & 1 deletion src/functional_test.rs
Expand Up @@ -9,7 +9,7 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
let store_reader = segment_reader.get_store_reader(1)?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
Expand Down
5 changes: 4 additions & 1 deletion src/indexer/index_writer.rs
Expand Up @@ -792,6 +792,7 @@ mod tests {
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order, ReloadPolicy, Term};

const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \
Expand Down Expand Up @@ -1550,7 +1551,9 @@ mod tests {

// doc store tests
for segment_reader in searcher.segment_readers().iter() {
let store_reader = segment_reader.get_store_reader().unwrap();
let store_reader = segment_reader
.get_store_reader(DOCSTORE_CACHE_CAPACITY)
.unwrap();
// test store iterator
for doc in store_reader.iter(segment_reader.alive_bitset()) {
let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap();
Expand Down
9 changes: 2 additions & 7 deletions src/indexer/merger.rs
Expand Up @@ -1036,13 +1036,9 @@ impl IndexMerger {
let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader())
.map(|reader| reader.get_store_reader(50))
.collect::<Result<_, _>>()?;

for store_reader in &store_readers {
store_reader.set_cache_size(50);
}

let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
Expand All @@ -1065,8 +1061,7 @@ impl IndexMerger {
} else {
debug!("trivial-doc-id-mapping");
for reader in &self.readers {
let store_reader = reader.get_store_reader()?;
store_reader.set_cache_size(1);
let store_reader = reader.get_store_reader(1)?;
if reader.has_deletes()
// If there is not enough data in the store, we avoid stacking in order to
// avoid creating many small blocks in the doc store. Once we have 5 full blocks,
Expand Down
1 change: 1 addition & 0 deletions src/indexer/segment_writer.rs
Expand Up @@ -389,6 +389,7 @@ fn remap_and_write(
serializer
.segment()
.open_read(SegmentComponent::TempStore)?,
50,
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
Expand Down
4 changes: 2 additions & 2 deletions src/reader/mod.rs
Expand Up @@ -13,7 +13,7 @@ use self::pool::Pool;
use self::warming::WarmingState;
use crate::core::searcher::SearcherGeneration;
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::LRU_CACHE_CAPACITY;
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};

/// Defines when a new version of the index should be reloaded.
Expand Down Expand Up @@ -60,7 +60,7 @@ impl IndexReaderBuilder {
index,
warmers: Vec::new(),
num_warming_threads: 1,
doc_store_cache_size: LRU_CACHE_CAPACITY,
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/store/mod.rs
Expand Up @@ -40,7 +40,7 @@ mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
pub(crate) use self::reader::LRU_CACHE_CAPACITY;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;

Expand Down Expand Up @@ -115,7 +115,7 @@ pub mod tests {
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let store = StoreReader::open(store_file, 10)?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
Expand Down Expand Up @@ -155,7 +155,7 @@ pub mod tests {
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let store = StoreReader::open(store_file, 10)?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
Expand Down Expand Up @@ -232,7 +232,7 @@ pub mod tests {

let searcher = index.reader()?.searcher();
let reader = searcher.segment_reader(0);
let store = reader.get_store_reader()?;
let store = reader.get_store_reader(10)?;
for doc in store.iter(reader.alive_bitset()) {
assert_eq!(
*doc?.get_first(text_field).unwrap().as_text().unwrap(),
Expand Down Expand Up @@ -336,7 +336,7 @@ pub mod tests {
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader()?;
let store = reader.get_store_reader(10)?;
assert_eq!(store.block_checkpoints().count(), 1);
Ok(())
}
Expand Down
17 changes: 4 additions & 13 deletions src/store/reader.rs
Expand Up @@ -19,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;

pub(crate) const LRU_CACHE_CAPACITY: usize = 100;
pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100;

type Block = OwnedBytes;

Expand Down Expand Up @@ -64,10 +64,6 @@ impl BlockCache {
self.cache.lock().unwrap().len()
}

fn set_size(&self, size: usize) {
self.cache.lock().unwrap().resize(size);
}

#[cfg(test)]
fn peek_lru(&self) -> Option<usize> {
self.cache
Expand Down Expand Up @@ -111,7 +107,7 @@ impl Sum for CacheStats {

impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;

let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
Expand All @@ -122,7 +118,7 @@ impl StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)),
cache: Mutex::new(LruCache::new(cache_size)),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
Expand All @@ -144,11 +140,6 @@ impl StoreReader {
self.cache.stats()
}

/// Set lru cache size for decompressed blocks. Defaults to 100 (LRU_CACHE_CAPACITY).
pub(crate) fn set_cache_size(&self, size: usize) {
self.cache.set_size(size)
}

/// Get checkpoint for DocId. The checkpoint can be used to load a block containing the
/// document.
///
Expand Down Expand Up @@ -405,7 +396,7 @@ mod tests {
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;

assert_eq!(store.cache.len(), 0);
assert_eq!(store.cache_stats().cache_hits, 0);
Expand Down

0 comments on commit b5402ea

Please sign in to comment.