Skip to content

Commit

Permalink
expose doc store cache size
Browse files Browse the repository at this point in the history
expose lru doc store cache size
optimize doc store cache size
  • Loading branch information
PSeitz committed Jul 1, 2022
1 parent db18366 commit 4f57747
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 19 deletions.
20 changes: 19 additions & 1 deletion src/core/searcher.rs
Expand Up @@ -6,7 +6,7 @@ use crate::core::{Executor, SegmentReader};
use crate::query::Query;
use crate::schema::{Document, Schema, Term};
use crate::space_usage::SearcherSpaceUsage;
use crate::store::StoreReader;
use crate::store::{CacheStats, StoreReader};
use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject};

/// Identifies the searcher generation accessed by a [Searcher].
Expand Down Expand Up @@ -77,11 +77,17 @@ impl Searcher {
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<Searcher> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(SegmentReader::get_store_reader)
.collect::<io::Result<Vec<_>>>()?;

for store_reader in &store_readers {
store_reader.set_cache_size(doc_store_cache_size);
}

Ok(Searcher {
schema,
index,
Expand Down Expand Up @@ -110,6 +116,18 @@ impl Searcher {
store_reader.get(doc_address.doc_id)
}

/// The cache stats for the underlying store reader.
///
/// Aggregates the sum for each segment store reader.
pub fn doc_store_cache_stats(&self) -> CacheStats {
let cache_stats: CacheStats = self
.store_readers
.iter()
.map(|reader| reader.cache_stats())
.sum();
cache_stats
}

/// Fetches a document in an asynchronous manner.
#[cfg(feature = "quickwit")]
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
Expand Down
28 changes: 18 additions & 10 deletions src/indexer/merger.rs
Expand Up @@ -1030,18 +1030,25 @@ impl IndexMerger {
debug_time!("write-storable-fields");
debug!("write-storable-field");

let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader())
.collect::<Result<_, _>>()?;
let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();
if !doc_id_mapping.is_trivial() {
debug!("non-trivial-doc-id-mapping");

let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader())
.collect::<Result<_, _>>()?;

for store_reader in &store_readers {
store_reader.set_cache_size(50);
}

let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();

for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
Expand All @@ -1059,6 +1066,7 @@ impl IndexMerger {
debug!("trivial-doc-id-mapping");
for reader in &self.readers {
let store_reader = reader.get_store_reader()?;
store_reader.set_cache_size(1);
if reader.has_deletes()
// If there is not enough data in the store, we avoid stacking in order to
// avoid creating many small blocks in the doc store. Once we have 5 full blocks,
Expand Down
6 changes: 6 additions & 0 deletions src/reader/mod.rs
Expand Up @@ -13,6 +13,7 @@ use self::pool::Pool;
use self::warming::WarmingState;
use crate::core::searcher::SearcherGeneration;
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::LRU_CACHE_CAPACITY;
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};

/// Defines when a new version of the index should be reloaded.
Expand Down Expand Up @@ -47,6 +48,7 @@ pub struct IndexReaderBuilder {
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_size: usize,
}

impl IndexReaderBuilder {
Expand All @@ -58,6 +60,7 @@ impl IndexReaderBuilder {
index,
warmers: Vec::new(),
num_warming_threads: 1,
doc_store_cache_size: LRU_CACHE_CAPACITY,
}
}

Expand All @@ -76,6 +79,7 @@ impl IndexReaderBuilder {
let inner_reader = InnerIndexReader {
index: self.index,
num_searchers: self.num_searchers,
doc_store_cache_size: self.doc_store_cache_size,
searcher_pool: Pool::new(),
warming_state,
searcher_generation_counter: Default::default(),
Expand Down Expand Up @@ -157,6 +161,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {

struct InnerIndexReader {
num_searchers: usize,
doc_store_cache_size: usize,
index: Index,
warming_state: WarmingState,
searcher_pool: Pool<Searcher>,
Expand Down Expand Up @@ -203,6 +208,7 @@ impl InnerIndexReader {
self.index.clone(),
segment_readers.clone(),
searcher_generation.clone(),
self.doc_store_cache_size,
)
})
.take(self.num_searchers)
Expand Down
3 changes: 2 additions & 1 deletion src/store/mod.rs
Expand Up @@ -40,7 +40,8 @@ mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
pub use self::reader::StoreReader;
pub(crate) use self::reader::LRU_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;

#[cfg(feature = "lz4-compression")]
Expand Down
51 changes: 44 additions & 7 deletions src/store/reader.rs
@@ -1,4 +1,6 @@
use std::io;
use std::iter::Sum;
use std::ops::AddAssign;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

Expand All @@ -17,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;

const LRU_CACHE_CAPACITY: usize = 100;
pub(crate) const LRU_CACHE_CAPACITY: usize = 100;

type Block = OwnedBytes;

Expand All @@ -30,18 +32,13 @@ pub struct StoreReader {
cache: BlockCache,
}

/// The cache for decompressed blocks.
struct BlockCache {
cache: Mutex<LruCache<usize, Block>>,
cache_hits: Arc<AtomicUsize>,
cache_misses: Arc<AtomicUsize>,
}

pub struct CacheStats {
pub num_entries: usize,
pub cache_hits: usize,
pub cache_misses: usize,
}

impl BlockCache {
fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&pos) {
Expand All @@ -67,6 +64,10 @@ impl BlockCache {
self.cache.lock().unwrap().len()
}

fn set_size(&self, size: usize) {
self.cache.lock().unwrap().resize(size);
}

#[cfg(test)]
fn peek_lru(&self) -> Option<usize> {
self.cache
Expand All @@ -77,6 +78,37 @@ impl BlockCache {
}
}

#[derive(Debug, Default)]
/// CacheStats for the `StoreReader`.
pub struct CacheStats {
/// The number of entries in the cache
pub num_entries: usize,
/// The number of cache hits.
pub cache_hits: usize,
/// The number of cache misses.
pub cache_misses: usize,
}

impl AddAssign for CacheStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_entries: self.num_entries + other.num_entries,
cache_hits: self.cache_hits + other.cache_hits,
cache_misses: self.cache_misses + other.cache_misses,
};
}
}

impl Sum for CacheStats {
fn sum<I: Iterator<Item = Self>>(mut iter: I) -> Self {
let mut first = iter.next().unwrap_or_default();
for el in iter {
first += el;
}
first
}
}

impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
Expand Down Expand Up @@ -112,6 +144,11 @@ impl StoreReader {
self.cache.stats()
}

/// Set lru cache size for decompressed blocks. Defaults to 100 (LRU_CACHE_CAPACITY).
pub(crate) fn set_cache_size(&self, size: usize) {
self.cache.set_size(size)
}

/// Get checkpoint for DocId. The checkpoint can be used to load a block containing the
/// document.
///
Expand Down

0 comments on commit 4f57747

Please sign in to comment.