Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose doc store cache size #1403

Merged
merged 2 commits into from Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/core/searcher.rs
Expand Up @@ -6,7 +6,7 @@ use crate::core::{Executor, SegmentReader};
use crate::query::Query;
use crate::schema::{Document, Schema, Term};
use crate::space_usage::SearcherSpaceUsage;
use crate::store::StoreReader;
use crate::store::{CacheStats, StoreReader};
use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject};

/// Identifies the searcher generation accessed by a [Searcher].
Expand Down Expand Up @@ -77,11 +77,13 @@ impl Searcher {
index: Index,
segment_readers: Vec<SegmentReader>,
generation: TrackedObject<SearcherGeneration>,
doc_store_cache_size: usize,
) -> io::Result<Searcher> {
let store_readers: Vec<StoreReader> = segment_readers
.iter()
.map(SegmentReader::get_store_reader)
.map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size))
.collect::<io::Result<Vec<_>>>()?;

Ok(Searcher {
schema,
index,
Expand Down Expand Up @@ -110,6 +112,18 @@ impl Searcher {
store_reader.get(doc_address.doc_id)
}

/// The cache stats for the underlying store reader.
///
/// Aggregates the sum for each segment store reader.
pub fn doc_store_cache_stats(&self) -> CacheStats {
let cache_stats: CacheStats = self
.store_readers
.iter()
.map(|reader| reader.cache_stats())
.sum();
cache_stats
}

/// Fetches a document in an asynchronous manner.
#[cfg(feature = "quickwit")]
pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result<Document> {
Expand Down
6 changes: 3 additions & 3 deletions src/core/segment_reader.rs
Expand Up @@ -133,8 +133,8 @@ impl SegmentReader {
}

/// Accessor to the segment's `StoreReader`.
pub fn get_store_reader(&self) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone())
pub fn get_store_reader(&self, cache_size: usize) -> io::Result<StoreReader> {
StoreReader::open(self.store_file.clone(), cache_size)
}

/// Open a new segment for reading.
Expand Down Expand Up @@ -326,7 +326,7 @@ impl SegmentReader {
self.positions_composite.space_usage(),
self.fast_fields_readers.space_usage(),
self.fieldnorm_readers.space_usage(),
self.get_store_reader()?.space_usage(),
self.get_store_reader(0)?.space_usage(),
self.alive_bitset_opt
.as_ref()
.map(AliveBitSet::space_usage)
Expand Down
2 changes: 1 addition & 1 deletion src/functional_test.rs
Expand Up @@ -9,7 +9,7 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> {
assert!(searcher.segment_readers().len() < 20);
assert_eq!(searcher.num_docs() as usize, vals.len());
for segment_reader in searcher.segment_readers() {
let store_reader = segment_reader.get_store_reader()?;
let store_reader = segment_reader.get_store_reader(1)?;
for doc_id in 0..segment_reader.max_doc() {
let _doc = store_reader.get(doc_id)?;
}
Expand Down
5 changes: 4 additions & 1 deletion src/indexer/index_writer.rs
Expand Up @@ -792,6 +792,7 @@ mod tests {
self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions,
TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT,
};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order, ReloadPolicy, Term};

const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \
Expand Down Expand Up @@ -1550,7 +1551,9 @@ mod tests {

// doc store tests
for segment_reader in searcher.segment_readers().iter() {
let store_reader = segment_reader.get_store_reader().unwrap();
let store_reader = segment_reader
.get_store_reader(DOCSTORE_CACHE_CAPACITY)
.unwrap();
// test store iterator
for doc in store_reader.iter(segment_reader.alive_bitset()) {
let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap();
Expand Down
25 changes: 14 additions & 11 deletions src/indexer/merger.rs
Expand Up @@ -1030,18 +1030,21 @@ impl IndexMerger {
debug_time!("write-storable-fields");
debug!("write-storable-field");

let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader())
.collect::<Result<_, _>>()?;
let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();
if !doc_id_mapping.is_trivial() {
debug!("non-trivial-doc-id-mapping");

let store_readers: Vec<_> = self
.readers
.iter()
.map(|reader| reader.get_store_reader(50))
.collect::<Result<_, _>>()?;

let mut document_iterators: Vec<_> = store_readers
.iter()
.enumerate()
.map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset()))
.collect();

for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() {
let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize];
if let Some(doc_bytes_res) = doc_bytes_it.next() {
Expand All @@ -1058,7 +1061,7 @@ impl IndexMerger {
} else {
debug!("trivial-doc-id-mapping");
for reader in &self.readers {
let store_reader = reader.get_store_reader()?;
let store_reader = reader.get_store_reader(1)?;
if reader.has_deletes()
// If there is not enough data in the store, we avoid stacking in order to
// avoid creating many small blocks in the doc store. Once we have 5 full blocks,
Expand Down
1 change: 1 addition & 0 deletions src/indexer/segment_writer.rs
Expand Up @@ -389,6 +389,7 @@ fn remap_and_write(
serializer
.segment()
.open_read(SegmentComponent::TempStore)?,
50,
)?;
for old_doc_id in doc_id_map.iter_old_doc_ids() {
let doc_bytes = store_read.get_document_bytes(old_doc_id)?;
Expand Down
6 changes: 6 additions & 0 deletions src/reader/mod.rs
Expand Up @@ -13,6 +13,7 @@ use self::pool::Pool;
use self::warming::WarmingState;
use crate::core::searcher::SearcherGeneration;
use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK};
use crate::store::DOCSTORE_CACHE_CAPACITY;
use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject};

/// Defines when a new version of the index should be reloaded.
Expand Down Expand Up @@ -47,6 +48,7 @@ pub struct IndexReaderBuilder {
index: Index,
warmers: Vec<Weak<dyn Warmer>>,
num_warming_threads: usize,
doc_store_cache_size: usize,
}

impl IndexReaderBuilder {
Expand All @@ -58,6 +60,7 @@ impl IndexReaderBuilder {
index,
warmers: Vec::new(),
num_warming_threads: 1,
doc_store_cache_size: DOCSTORE_CACHE_CAPACITY,
}
}

Expand All @@ -76,6 +79,7 @@ impl IndexReaderBuilder {
let inner_reader = InnerIndexReader {
index: self.index,
num_searchers: self.num_searchers,
doc_store_cache_size: self.doc_store_cache_size,
searcher_pool: Pool::new(),
warming_state,
searcher_generation_counter: Default::default(),
Expand Down Expand Up @@ -157,6 +161,7 @@ impl TryInto<IndexReader> for IndexReaderBuilder {

struct InnerIndexReader {
num_searchers: usize,
doc_store_cache_size: usize,
index: Index,
warming_state: WarmingState,
searcher_pool: Pool<Searcher>,
Expand Down Expand Up @@ -203,6 +208,7 @@ impl InnerIndexReader {
self.index.clone(),
segment_readers.clone(),
searcher_generation.clone(),
self.doc_store_cache_size,
)
})
.take(self.num_searchers)
Expand Down
17 changes: 9 additions & 8 deletions src/store/mod.rs
Expand Up @@ -40,7 +40,8 @@ mod reader;
mod writer;
pub use self::compressors::{Compressor, ZstdCompressor};
pub use self::decompressors::Decompressor;
pub use self::reader::StoreReader;
pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY;
pub use self::reader::{CacheStats, StoreReader};
pub use self::writer::StoreWriter;

#[cfg(feature = "lz4-compression")]
Expand Down Expand Up @@ -114,7 +115,7 @@ pub mod tests {
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let store = StoreReader::open(store_file, 10)?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
Expand Down Expand Up @@ -154,7 +155,7 @@ pub mod tests {
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize);
let field_title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let store = StoreReader::open(store_file, 10)?;
for i in 0..NUM_DOCS as u32 {
assert_eq!(
*store
Expand Down Expand Up @@ -231,7 +232,7 @@ pub mod tests {

let searcher = index.reader()?.searcher();
let reader = searcher.segment_reader(0);
let store = reader.get_store_reader()?;
let store = reader.get_store_reader(10)?;
for doc in store.iter(reader.alive_bitset()) {
assert_eq!(
*doc?.get_first(text_field).unwrap().as_text().unwrap(),
Expand Down Expand Up @@ -267,7 +268,7 @@ pub mod tests {
}
assert_eq!(
index.reader().unwrap().searcher().segment_readers()[0]
.get_store_reader()
.get_store_reader(10)
.unwrap()
.decompressor(),
Decompressor::Lz4
Expand All @@ -288,7 +289,7 @@ pub mod tests {
let searcher = index.reader().unwrap().searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader().unwrap();
let store = reader.get_store_reader(10).unwrap();

for doc in store.iter(reader.alive_bitset()).take(50) {
assert_eq!(
Expand Down Expand Up @@ -335,7 +336,7 @@ pub mod tests {
let searcher = index.reader()?.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
let reader = searcher.segment_readers().iter().last().unwrap();
let store = reader.get_store_reader()?;
let store = reader.get_store_reader(10)?;
assert_eq!(store.block_checkpoints().count(), 1);
Ok(())
}
Expand Down Expand Up @@ -379,7 +380,7 @@ mod bench {
16_384,
);
let store_file = directory.open_read(path).unwrap();
let store = StoreReader::open(store_file).unwrap();
let store = StoreReader::open(store_file, 10).unwrap();
b.iter(|| store.iter(None).collect::<Vec<_>>());
}
}
48 changes: 38 additions & 10 deletions src/store/reader.rs
@@ -1,4 +1,6 @@
use std::io;
use std::iter::Sum;
use std::ops::AddAssign;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};

Expand All @@ -17,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage;
use crate::store::index::Checkpoint;
use crate::DocId;

const LRU_CACHE_CAPACITY: usize = 100;
pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100;

type Block = OwnedBytes;

Expand All @@ -30,18 +32,13 @@ pub struct StoreReader {
cache: BlockCache,
}

/// The cache for decompressed blocks.
struct BlockCache {
cache: Mutex<LruCache<usize, Block>>,
cache_hits: Arc<AtomicUsize>,
cache_misses: Arc<AtomicUsize>,
}

pub struct CacheStats {
pub num_entries: usize,
pub cache_hits: usize,
pub cache_misses: usize,
}

impl BlockCache {
fn get_from_cache(&self, pos: usize) -> Option<Block> {
if let Some(block) = self.cache.lock().unwrap().get(&pos) {
Expand Down Expand Up @@ -77,9 +74,40 @@ impl BlockCache {
}
}

#[derive(Debug, Default)]
/// CacheStats for the `StoreReader`.
pub struct CacheStats {
/// The number of entries in the cache
pub num_entries: usize,
/// The number of cache hits.
pub cache_hits: usize,
/// The number of cache misses.
pub cache_misses: usize,
}

impl AddAssign for CacheStats {
fn add_assign(&mut self, other: Self) {
*self = Self {
num_entries: self.num_entries + other.num_entries,
cache_hits: self.cache_hits + other.cache_hits,
cache_misses: self.cache_misses + other.cache_misses,
};
}
}

impl Sum for CacheStats {
fn sum<I: Iterator<Item = Self>>(mut iter: I) -> Self {
let mut first = iter.next().unwrap_or_default();
for el in iter {
first += el;
}
first
}
}

impl StoreReader {
/// Opens a store reader
pub fn open(store_file: FileSlice) -> io::Result<StoreReader> {
pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result<StoreReader> {
let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?;

let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize);
Expand All @@ -90,7 +118,7 @@ impl StoreReader {
decompressor: footer.decompressor,
data: data_file,
cache: BlockCache {
cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)),
cache: Mutex::new(LruCache::new(cache_size)),
cache_hits: Default::default(),
cache_misses: Default::default(),
},
Expand Down Expand Up @@ -368,7 +396,7 @@ mod tests {
let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE);
let title = schema.get_field("title").unwrap();
let store_file = directory.open_read(path)?;
let store = StoreReader::open(store_file)?;
let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?;

assert_eq!(store.cache.len(), 0);
assert_eq!(store.cache_stats().cache_hits, 0);
Expand Down