diff --git a/.github/workflows/long_running.yml b/.github/workflows/long_running.yml index 692c3500b8..c0de149212 100644 --- a/.github/workflows/long_running.yml +++ b/.github/workflows/long_running.yml @@ -9,16 +9,21 @@ env: NUM_FUNCTIONAL_TEST_ITERATIONS: 20000 jobs: - functional_test_unsorted: + test: + runs-on: ubuntu-latest + steps: - uses: actions/checkout@v3 + - name: Install stable + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + - name: Run indexing_unsorted run: cargo test indexing_unsorted -- --ignored - functional_test_sorted: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - name: Run indexing_sorted run: cargo test indexing_sorted -- --ignored diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2604f17de1..01af351950 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,8 +16,6 @@ jobs: steps: - uses: actions/checkout@v3 - - name: Build - run: cargo build --verbose --workspace - name: Install latest nightly to test also against unstable feature flag uses: actions-rs/toolchain@v1 with: @@ -25,13 +23,16 @@ jobs: override: true components: rustfmt - - name: Install latest nightly to test also against unstable feature flag + - name: Install stable uses: actions-rs/toolchain@v1 with: toolchain: stable override: true components: rustfmt, clippy + - name: Build + run: cargo build --verbose --workspace + - name: Run tests run: cargo +stable test --features mmap,brotli-compression,lz4-compression,snappy-compression,zstd-compression,failpoints --verbose --workspace diff --git a/Cargo.toml b/Cargo.toml index dcb01229fd..6ecc4a5d4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,7 @@ proptest = "1.0.0" criterion = "0.3.5" test-log = "0.2.10" env_logger = "0.9.0" -pprof = { version = "0.9.0", features = ["flamegraph", "criterion"] } +pprof = { version = "0.10.0", features = ["flamegraph", "criterion"] } futures = "0.3.21" [dev-dependencies.fail] diff --git a/bitpacker/Cargo.toml b/bitpacker/Cargo.toml index 6f9eb5ae5c..48c1b8a1c0 100644 --- a/bitpacker/Cargo.toml +++ b/bitpacker/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tantivy-bitpacker" version = "0.2.0" -edition = "2018" +edition = "2021" authors = ["Paul Masurel "] license = "MIT" categories = [] diff --git a/common/Cargo.toml b/common/Cargo.toml index 6a7df405b8..f7085d9c13 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -3,7 +3,7 @@ name = "tantivy-common" version = "0.3.0" authors = ["Paul Masurel ", "Pascal Seitz "] license = "MIT" -edition = "2018" +edition = "2021" description = "common traits and utility functions used by multiple tantivy subcrates" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/fastfield_codecs/Cargo.toml b/fastfield_codecs/Cargo.toml index 26d587487d..1a52025ce2 100644 --- a/fastfield_codecs/Cargo.toml +++ b/fastfield_codecs/Cargo.toml @@ -3,7 +3,7 @@ name = "fastfield_codecs" version = "0.2.0" authors = ["Pascal Seitz "] license = "MIT" -edition = "2018" +edition = "2021" description = "Fast field codecs used by tantivy" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/ownedbytes/Cargo.toml b/ownedbytes/Cargo.toml index 40a8cbf3e3..b06db9f930 100644 --- a/ownedbytes/Cargo.toml +++ b/ownedbytes/Cargo.toml @@ -2,7 +2,7 @@ authors = ["Paul Masurel ", "Pascal Seitz "] name = "ownedbytes" version = "0.3.0" -edition = "2018" +edition = "2021" description = "Expose data as static slice" license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/query-grammar/Cargo.toml b/query-grammar/Cargo.toml index b395de9138..02c967bbcf 100644 --- a/query-grammar/Cargo.toml +++ b/query-grammar/Cargo.toml @@ -9,9 +9,9 @@ homepage = "https://github.com/quickwit-oss/tantivy" repository = "https://github.com/quickwit-oss/tantivy" readme = "README.md" keywords = ["search", "information", "retrieval"] -edition = "2018" +edition = "2021" [dependencies] combine = {version="4", default-features=false, features=[] } once_cell = "1.7.2" -regex ={ version = "1.5.4", default-features = false, features = ["std"] } +regex ={ version = "1.5.4", default-features = false, features = ["std", "unicode"] } diff --git a/query-grammar/src/query_grammar.rs b/query-grammar/src/query_grammar.rs index dc8d06fbf3..4f05fbbea5 100644 --- a/query-grammar/src/query_grammar.rs +++ b/query-grammar/src/query_grammar.rs @@ -16,9 +16,9 @@ use crate::Occur; // Note: '-' char is only forbidden at the beginning of a field name, would be clearer to add it to // special characters. const SPECIAL_CHARS: &[char] = &[ - '+', '^', '`', ':', '{', '}', '"', '[', ']', '(', ')', '~', '!', '\\', '*', ' ', + '+', '^', '`', ':', '{', '}', '"', '[', ']', '(', ')', '!', '\\', '*', ' ', ]; -const ESCAPED_SPECIAL_CHARS_PATTERN: &str = r#"\\(\+|\^|`|:|\{|\}|"|\[|\]|\(|\)|\~|!|\\|\*|\s)"#; +const ESCAPED_SPECIAL_CHARS_PATTERN: &str = r#"\\(\+|\^|`|:|\{|\}|"|\[|\]|\(|\)|!|\\|\*|\s)"#; /// Parses a field_name /// A field name must have at least one character and be followed by a colon. @@ -120,22 +120,36 @@ fn date_time<'a>() -> impl Parser<&'a str, Output = String> { fn term_val<'a>() -> impl Parser<&'a str, Output = String> { let phrase = char('"').with(many1(satisfy(|c| c != '"'))).skip(char('"')); - phrase.or(word()) + negative_number().or(phrase.or(word())) } fn term_query<'a>() -> impl Parser<&'a str, Output = UserInputLiteral> { - let term_val_with_field = negative_number().or(term_val()); - (field_name(), term_val_with_field).map(|(field_name, phrase)| UserInputLiteral { + (field_name(), term_val(), slop_val()).map(|(field_name, phrase, slop)| UserInputLiteral { field_name: Some(field_name), phrase, + slop, + }) +} + +fn slop_val<'a>() -> impl Parser<&'a str, Output = u32> { + let slop = + (char('~'), many1(digit())).and_then(|(_, slop): (_, String)| match slop.parse::() { + Ok(d) => Ok(d), + _ => Err(StringStreamError::UnexpectedParse), + }); + optional(slop).map(|slop| match slop { + Some(d) => d, + _ => 0, }) } fn literal<'a>() -> impl Parser<&'a str, Output = UserInputLeaf> { - let term_default_field = term_val().map(|phrase| UserInputLiteral { + let term_default_field = (term_val(), slop_val()).map(|(phrase, slop)| UserInputLiteral { field_name: None, phrase, + slop, }); + attempt(term_query()) .or(term_default_field) .map(UserInputLeaf::from) @@ -522,18 +536,10 @@ mod test { super::field_name().parse(".my.field.name:a"), Ok((".my.field.name".to_string(), "a")) ); - assert_eq!( - super::field_name().parse(r#"my\ field:a"#), - Ok(("my field".to_string(), "a")) - ); assert_eq!( super::field_name().parse(r#"にんじん:a"#), Ok(("にんじん".to_string(), "a")) ); - assert_eq!( - super::field_name().parse("my\\ field\\ name:a"), - Ok(("my field name".to_string(), "a")) - ); assert_eq!( super::field_name().parse(r#"my\field:a"#), Ok((r#"my\field"#.to_string(), "a")) @@ -562,6 +568,17 @@ mod test { super::field_name().parse("_my_field:a"), Ok(("_my_field".to_string(), "a")) ); + assert_eq!( + super::field_name().parse("~my~field:a"), + Ok(("~my~field".to_string(), "a")) + ); + for special_char in SPECIAL_CHARS.iter() { + let query = &format!("\\{special_char}my\\{special_char}field:a"); + assert_eq!( + super::field_name().parse(&query), + Ok((format!("{special_char}my{special_char}field"), "a")) + ); + } } #[test] @@ -714,4 +731,22 @@ mod test { ); test_is_parse_err("abc + "); } + + #[test] + fn test_slop() { + assert!(parse_to_ast().parse("\"a b\"~").is_err()); + assert!(parse_to_ast().parse("foo:\"a b\"~").is_err()); + assert!(parse_to_ast().parse("\"a b\"~a").is_err()); + assert!(parse_to_ast().parse("\"a b\"~100000000000000000").is_err()); + + test_parse_query_to_ast_helper("\"a b\"^2~4", "(*(\"a b\")^2 *\"~4\")"); + test_parse_query_to_ast_helper("\"~Document\"", "\"~Document\""); + test_parse_query_to_ast_helper("~Document", "\"~Document\""); + test_parse_query_to_ast_helper("a~2", "\"a~2\""); + test_parse_query_to_ast_helper("\"a b\"~0", "\"a b\""); + test_parse_query_to_ast_helper("\"a b\"~1", "\"a b\"~1"); + test_parse_query_to_ast_helper("\"a b\"~3", "\"a b\"~3"); + test_parse_query_to_ast_helper("foo:\"a b\"~300", "\"foo\":\"a b\"~300"); + test_parse_query_to_ast_helper("\"a b\"~300^2", "(\"a b\"~300)^2"); + } } diff --git a/query-grammar/src/user_input_ast.rs b/query-grammar/src/user_input_ast.rs index 359900bab1..3130ddbbe6 100644 --- a/query-grammar/src/user_input_ast.rs +++ b/query-grammar/src/user_input_ast.rs @@ -40,14 +40,19 @@ impl Debug for UserInputLeaf { pub struct UserInputLiteral { pub field_name: Option, pub phrase: String, + pub slop: u32, } impl fmt::Debug for UserInputLiteral { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self.field_name { - Some(ref field_name) => write!(formatter, "\"{}\":\"{}\"", field_name, self.phrase), - None => write!(formatter, "\"{}\"", self.phrase), + if let Some(ref field) = self.field_name { + write!(formatter, "\"{}\":", field)?; } + write!(formatter, "\"{}\"", self.phrase)?; + if self.slop > 0 { + write!(formatter, "~{}", self.slop)?; + } + Ok(()) } } diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index c9510d9263..f3638fae82 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -9,6 +9,7 @@ use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_valida use crate::collector::{Collector, SegmentCollector}; use crate::{SegmentReader, TantivyError}; +/// The default max bucket count, before the aggregation fails. pub const MAX_BUCKET_COUNT: u32 = 65000; /// Collector for aggregations. @@ -22,6 +23,7 @@ pub struct AggregationCollector { impl AggregationCollector { /// Create collector from aggregation request. /// + /// Aggregation fails when the total bucket count is higher than max_bucket_count. /// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset pub fn from_aggs(agg: Aggregations, max_bucket_count: Option) -> Self { Self { diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 20eef59c07..8e03ad15ac 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -280,11 +280,9 @@ impl IntermediateBucketResult { .collect::>>()?; buckets.sort_by(|left, right| { - // TODO use total_cmp next stable rust release left.from .unwrap_or(f64::MIN) - .partial_cmp(&right.from.unwrap_or(f64::MIN)) - .unwrap_or(Ordering::Equal) + .total_cmp(&right.from.unwrap_or(f64::MIN)) }); Ok(BucketResult::Range { buckets }) } @@ -441,12 +439,9 @@ impl IntermediateTermBucketResult { }) .collect::>>()?; - buckets_with_val.sort_by(|(_, val1), (_, val2)| { - // TODO use total_cmp in next rust stable release - match &order { - Order::Desc => val2.partial_cmp(val1).unwrap_or(std::cmp::Ordering::Equal), - Order::Asc => val1.partial_cmp(val2).unwrap_or(std::cmp::Ordering::Equal), - } + buckets_with_val.sort_by(|(_, val1), (_, val2)| match &order { + Order::Desc => val2.total_cmp(val1), + Order::Asc => val1.total_cmp(val2), }); buckets = buckets_with_val .into_iter() diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 7f6f8378ce..69ae782db7 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -166,6 +166,7 @@ use std::fmt::Display; pub use collector::{ AggregationCollector, AggregationSegmentCollector, DistributedAggregationCollector, + MAX_BUCKET_COUNT, }; use itertools::Itertools; use serde::{Deserialize, Serialize}; diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index e2ef47f989..fc514c8164 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -271,8 +271,8 @@ impl Collector for FacetCollector { let mut facet_streamer = facet_reader.facet_dict().range().into_stream()?; if facet_streamer.advance() { 'outer: loop { - // at the begining of this loop, facet_streamer - // is positionned on a term that has not been processed yet. + // at the beginning of this loop, facet_streamer + // is positioned on a term that has not been processed yet. let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it); match skip_result { SkipResult::Found => { diff --git a/src/collector/tests.rs b/src/collector/tests.rs index 3bda822a10..ee48cf3014 100644 --- a/src/collector/tests.rs +++ b/src/collector/tests.rs @@ -69,10 +69,8 @@ pub fn test_filter_collector() -> crate::Result<()> { /// Stores all of the doc ids. /// This collector is only used for tests. -/// It is unusable in pr -/// -/// actise, as it does not store -/// the segment ordinals +/// It is unusable in practise, as it does +/// not store the segment ordinals pub struct TestCollector { pub compute_score: bool, } diff --git a/src/collector/top_collector.rs b/src/collector/top_collector.rs index 03c923d763..691ef324b0 100644 --- a/src/collector/top_collector.rs +++ b/src/collector/top_collector.rs @@ -137,7 +137,7 @@ where T: PartialOrd + Clone /// sorted by type `T`. /// /// The implementation is based on a `BinaryHeap`. -/// The theorical complexity for collecting the top `K` out of `n` documents +/// The theoretical complexity for collecting the top `K` out of `n` documents /// is `O(n log K)`. pub(crate) struct TopSegmentCollector { limit: usize, diff --git a/src/collector/top_score_collector.rs b/src/collector/top_score_collector.rs index 516dedcb58..a0e40cc80f 100644 --- a/src/collector/top_score_collector.rs +++ b/src/collector/top_score_collector.rs @@ -79,7 +79,7 @@ where /// sorted by their score. /// /// The implementation is based on a `BinaryHeap`. -/// The theorical complexity for collecting the top `K` out of `n` documents +/// The theoretical complexity for collecting the top `K` out of `n` documents /// is `O(n log K)`. /// /// This collector guarantees a stable sorting in case of a tie on the @@ -283,7 +283,7 @@ impl TopDocs { /// /// # See also /// - /// To confortably work with `u64`s, `i64`s, `f64`s, or `date`s, please refer to + /// To comfortably work with `u64`s, `i64`s, `f64`s, or `date`s, please refer to /// [.order_by_fast_field(...)](#method.order_by_fast_field) method. pub fn order_by_u64_field( self, diff --git a/src/core/searcher.rs b/src/core/searcher.rs index 1b8f1257e5..8f9bca8104 100644 --- a/src/core/searcher.rs +++ b/src/core/searcher.rs @@ -6,7 +6,7 @@ use crate::core::{Executor, SegmentReader}; use crate::query::Query; use crate::schema::{Document, Schema, Term}; use crate::space_usage::SearcherSpaceUsage; -use crate::store::StoreReader; +use crate::store::{CacheStats, StoreReader}; use crate::{DocAddress, Index, Opstamp, SegmentId, TrackedObject}; /// Identifies the searcher generation accessed by a [Searcher]. @@ -77,11 +77,13 @@ impl Searcher { index: Index, segment_readers: Vec, generation: TrackedObject, + doc_store_cache_size: usize, ) -> io::Result { let store_readers: Vec = segment_readers .iter() - .map(SegmentReader::get_store_reader) + .map(|segment_reader| segment_reader.get_store_reader(doc_store_cache_size)) .collect::>>()?; + Ok(Searcher { schema, index, @@ -110,6 +112,18 @@ impl Searcher { store_reader.get(doc_address.doc_id) } + /// The cache stats for the underlying store reader. + /// + /// Aggregates the sum for each segment store reader. + pub fn doc_store_cache_stats(&self) -> CacheStats { + let cache_stats: CacheStats = self + .store_readers + .iter() + .map(|reader| reader.cache_stats()) + .sum(); + cache_stats + } + /// Fetches a document in an asynchronous manner. #[cfg(feature = "quickwit")] pub async fn doc_async(&self, doc_address: DocAddress) -> crate::Result { diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index dab64d8abd..26e0c41ecd 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -133,8 +133,8 @@ impl SegmentReader { } /// Accessor to the segment's `StoreReader`. - pub fn get_store_reader(&self) -> io::Result { - StoreReader::open(self.store_file.clone()) + pub fn get_store_reader(&self, cache_size: usize) -> io::Result { + StoreReader::open(self.store_file.clone(), cache_size) } /// Open a new segment for reading. @@ -326,7 +326,7 @@ impl SegmentReader { self.positions_composite.space_usage(), self.fast_fields_readers.space_usage(), self.fieldnorm_readers.space_usage(), - self.get_store_reader()?.space_usage(), + self.get_store_reader(0)?.space_usage(), self.alive_bitset_opt .as_ref() .map(AliveBitSet::space_usage) diff --git a/src/directory/directory.rs b/src/directory/directory.rs index e54b8dee32..7c7f81f660 100644 --- a/src/directory/directory.rs +++ b/src/directory/directory.rs @@ -1,6 +1,7 @@ use std::io::Write; use std::marker::{Send, Sync}; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::Duration; use std::{fmt, io, thread}; @@ -62,7 +63,12 @@ impl Drop for DirectoryLockGuard { enum TryAcquireLockError { FileExists, - IoError(io::Error), + IoError(Arc), +} +impl From for TryAcquireLockError { + fn from(io_error: io::Error) -> Self { + Self::IoError(Arc::new(io_error)) + } } fn try_acquire_lock( @@ -73,7 +79,7 @@ fn try_acquire_lock( OpenWriteError::FileAlreadyExists(_) => TryAcquireLockError::FileExists, OpenWriteError::IoError { io_error, .. } => TryAcquireLockError::IoError(io_error), })?; - write.flush().map_err(TryAcquireLockError::IoError)?; + write.flush().map_err(TryAcquireLockError::from)?; Ok(DirectoryLock::from(Box::new(DirectoryLockGuard { directory: directory.box_clone(), path: filepath.to_owned(), diff --git a/src/directory/error.rs b/src/directory/error.rs index 4bb273ce02..5292bcf3fb 100644 --- a/src/directory/error.rs +++ b/src/directory/error.rs @@ -1,10 +1,11 @@ use std::path::PathBuf; +use std::sync::Arc; use std::{fmt, io}; use crate::Version; /// Error while trying to acquire a directory lock. -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub enum LockError { /// Failed to acquired a lock as it is already held by another /// client. @@ -16,11 +17,18 @@ pub enum LockError { LockBusy, /// Trying to acquire a lock failed with an `IoError` #[error("Failed to acquire the lock due to an io:Error.")] - IoError(io::Error), + IoError(Arc), +} + +impl LockError { + /// Wraps an io error. + pub fn wrap_io_error(io_error: io::Error) -> Self { + Self::IoError(Arc::new(io_error)) + } } /// Error that may occur when opening a directory -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub enum OpenDirectoryError { /// The underlying directory does not exists. #[error("Directory does not exist: '{0}'.")] @@ -30,12 +38,12 @@ pub enum OpenDirectoryError { NotADirectory(PathBuf), /// Failed to create a temp directory. #[error("Failed to create a temporary directory: '{0}'.")] - FailedToCreateTempDir(io::Error), + FailedToCreateTempDir(Arc), /// IoError #[error("IoError '{io_error:?}' while create directory in: '{directory_path:?}'.")] IoError { /// underlying io Error. - io_error: io::Error, + io_error: Arc, /// directory we tried to open. directory_path: PathBuf, }, @@ -45,14 +53,14 @@ impl OpenDirectoryError { /// Wraps an io error. pub fn wrap_io_error(io_error: io::Error, directory_path: PathBuf) -> Self { Self::IoError { - io_error, + io_error: Arc::new(io_error), directory_path, } } } /// Error that may occur when starting to write in a file -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub enum OpenWriteError { /// Our directory is WORM, writing an existing file is forbidden. /// Checkout the `Directory` documentation. @@ -63,7 +71,7 @@ pub enum OpenWriteError { #[error("IoError '{io_error:?}' while opening file for write: '{filepath}'.")] IoError { /// The underlying `io::Error`. - io_error: io::Error, + io_error: Arc, /// File path of the file that tantivy failed to open for write. filepath: PathBuf, }, @@ -72,11 +80,15 @@ pub enum OpenWriteError { impl OpenWriteError { /// Wraps an io error. pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self { - Self::IoError { io_error, filepath } + Self::IoError { + io_error: Arc::new(io_error), + filepath, + } } } /// Type of index incompatibility between the library and the index found on disk /// Used to catch and provide a hint to solve this incompatibility issue +#[derive(Clone)] pub enum Incompatibility { /// This library cannot decompress the index found on disk CompressionMismatch { @@ -135,7 +147,7 @@ impl fmt::Debug for Incompatibility { } /// Error that may occur when accessing a file read -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub enum OpenReadError { /// The file does not exists. #[error("Files does not exists: {0:?}")] @@ -146,7 +158,7 @@ pub enum OpenReadError { )] IoError { /// The underlying `io::Error`. - io_error: io::Error, + io_error: Arc, /// File path of the file that tantivy failed to open for read. filepath: PathBuf, }, @@ -158,11 +170,14 @@ pub enum OpenReadError { impl OpenReadError { /// Wraps an io error. pub fn wrap_io_error(io_error: io::Error, filepath: PathBuf) -> Self { - Self::IoError { io_error, filepath } + Self::IoError { + io_error: Arc::new(io_error), + filepath, + } } } /// Error that may occur when trying to delete a file -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub enum DeleteError { /// The file does not exists. #[error("File does not exists: '{0}'.")] @@ -172,7 +187,7 @@ pub enum DeleteError { #[error("The following IO error happened while deleting file '{filepath}': '{io_error:?}'.")] IoError { /// The underlying `io::Error`. - io_error: io::Error, + io_error: Arc, /// File path of the file that tantivy failed to delete. filepath: PathBuf, }, diff --git a/src/directory/managed_directory.rs b/src/directory/managed_directory.rs index b153355677..1b7c23a050 100644 --- a/src/directory/managed_directory.rs +++ b/src/directory/managed_directory.rs @@ -242,16 +242,13 @@ impl ManagedDirectory { /// Verify checksum of a managed file pub fn validate_checksum(&self, path: &Path) -> result::Result { let reader = self.directory.open_read(path)?; - let (footer, data) = - Footer::extract_footer(reader).map_err(|io_error| OpenReadError::IoError { - io_error, - filepath: path.to_path_buf(), - })?; + let (footer, data) = Footer::extract_footer(reader) + .map_err(|io_error| OpenReadError::wrap_io_error(io_error, path.to_path_buf()))?; let bytes = data .read_bytes() .map_err(|io_error| OpenReadError::IoError { + io_error: Arc::new(io_error), filepath: path.to_path_buf(), - io_error, })?; let mut hasher = Hasher::new(); hasher.update(bytes.as_slice()); diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index b17c3625fa..1c7e1bc0a9 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -174,7 +174,8 @@ impl MmapDirectory { /// This is mostly useful to test the MmapDirectory itself. /// For your unit tests, prefer the RamDirectory. pub fn create_from_tempdir() -> Result { - let tempdir = TempDir::new().map_err(OpenDirectoryError::FailedToCreateTempDir)?; + let tempdir = TempDir::new() + .map_err(|io_err| OpenDirectoryError::FailedToCreateTempDir(Arc::new(io_err)))?; Ok(MmapDirectory::new( tempdir.path().to_path_buf(), Some(tempdir), @@ -342,7 +343,7 @@ impl Directory for MmapDirectory { DeleteError::FileDoesNotExist(path.to_owned()) } else { DeleteError::IoError { - io_error: e, + io_error: Arc::new(e), filepath: path.to_path_buf(), } } @@ -422,9 +423,9 @@ impl Directory for MmapDirectory { .write(true) .create(true) //< if the file does not exist yet, create it. .open(&full_path) - .map_err(LockError::IoError)?; + .map_err(LockError::wrap_io_error)?; if lock.is_blocking { - file.lock_exclusive().map_err(LockError::IoError)?; + file.lock_exclusive().map_err(LockError::wrap_io_error)?; } else { file.try_lock_exclusive().map_err(|_| LockError::LockBusy)? } diff --git a/src/directory/ram_directory.rs b/src/directory/ram_directory.rs index f501b100d2..bb9c32050c 100644 --- a/src/directory/ram_directory.rs +++ b/src/directory/ram_directory.rs @@ -172,7 +172,7 @@ impl Directory for RamDirectory { fn delete(&self, path: &Path) -> result::Result<(), DeleteError> { fail_point!("RamDirectory::delete", |_| { Err(DeleteError::IoError { - io_error: io::Error::from(io::ErrorKind::Other), + io_error: Arc::new(io::Error::from(io::ErrorKind::Other)), filepath: path.to_path_buf(), }) }); @@ -184,7 +184,7 @@ impl Directory for RamDirectory { .fs .read() .map_err(|e| OpenReadError::IoError { - io_error: io::Error::new(io::ErrorKind::Other, e.to_string()), + io_error: Arc::new(io::Error::new(io::ErrorKind::Other, e.to_string())), filepath: path.to_path_buf(), })? .exists(path)) @@ -208,7 +208,7 @@ impl Directory for RamDirectory { self.open_read(path)? .read_bytes() .map_err(|io_error| OpenReadError::IoError { - io_error, + io_error: Arc::new(io_error), filepath: path.to_path_buf(), })?; Ok(bytes.as_slice().to_owned()) diff --git a/src/error.rs b/src/error.rs index baccb32580..e8136cdefa 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,7 @@ //! Definition of Tantivy's errors and results. use std::path::PathBuf; -use std::sync::PoisonError; +use std::sync::{Arc, PoisonError}; use std::{fmt, io}; use thiserror::Error; @@ -15,6 +15,7 @@ use crate::{query, schema}; /// Represents a `DataCorruption` error. /// /// When facing data corruption, tantivy actually panics or returns this error. +#[derive(Clone)] pub struct DataCorruption { filepath: Option, comment: String, @@ -50,7 +51,7 @@ impl fmt::Debug for DataCorruption { } /// The library's error enum -#[derive(Debug, Error)] +#[derive(Debug, Clone, Error)] pub enum TantivyError { /// Failed to open the directory. #[error("Failed to open the directory: '{0:?}'")] @@ -69,7 +70,7 @@ pub enum TantivyError { LockFailure(LockError, Option), /// IO Error. #[error("An IO error occurred: '{0}'")] - IoError(#[from] io::Error), + IoError(Arc), /// Data corruption. #[error("Data corrupted: '{0:?}'")] DataCorruption(DataCorruption), @@ -125,6 +126,11 @@ impl From for TantivyError { } } +impl From for TantivyError { + fn from(io_err: io::Error) -> TantivyError { + TantivyError::IoError(Arc::new(io_err)) + } +} impl From for TantivyError { fn from(data_corruption: DataCorruption) -> TantivyError { TantivyError::DataCorruption(data_corruption) @@ -179,7 +185,7 @@ impl From for TantivyError { impl From for TantivyError { fn from(error: serde_json::Error) -> TantivyError { - TantivyError::IoError(error.into()) + TantivyError::IoError(Arc::new(error.into())) } } diff --git a/src/functional_test.rs b/src/functional_test.rs index e6be8bcc50..e0d0c8bfee 100644 --- a/src/functional_test.rs +++ b/src/functional_test.rs @@ -9,7 +9,7 @@ fn check_index_content(searcher: &Searcher, vals: &[u64]) -> crate::Result<()> { assert!(searcher.segment_readers().len() < 20); assert_eq!(searcher.num_docs() as usize, vals.len()); for segment_reader in searcher.segment_readers() { - let store_reader = segment_reader.get_store_reader()?; + let store_reader = segment_reader.get_store_reader(1)?; for doc_id in 0..segment_reader.max_doc() { let _doc = store_reader.get(doc_id)?; } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 9d5a329893..8718c5370b 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -792,6 +792,7 @@ mod tests { self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, }; + use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{DocAddress, Index, IndexSettings, IndexSortByField, Order, ReloadPolicy, Term}; const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \ @@ -1550,7 +1551,9 @@ mod tests { // doc store tests for segment_reader in searcher.segment_readers().iter() { - let store_reader = segment_reader.get_store_reader().unwrap(); + let store_reader = segment_reader + .get_store_reader(DOCSTORE_CACHE_CAPACITY) + .unwrap(); // test store iterator for doc in store_reader.iter(segment_reader.alive_bitset()) { let id = doc.unwrap().get_first(id_field).unwrap().as_u64().unwrap(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 1e95849bd1..2ac6ec339b 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -1038,18 +1038,21 @@ impl IndexMerger { debug_time!("write-storable-fields"); debug!("write-storable-field"); - let store_readers: Vec<_> = self - .readers - .iter() - .map(|reader| reader.get_store_reader()) - .collect::>()?; - let mut document_iterators: Vec<_> = store_readers - .iter() - .enumerate() - .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) - .collect(); if !doc_id_mapping.is_trivial() { debug!("non-trivial-doc-id-mapping"); + + let store_readers: Vec<_> = self + .readers + .iter() + .map(|reader| reader.get_store_reader(50)) + .collect::>()?; + + let mut document_iterators: Vec<_> = store_readers + .iter() + .enumerate() + .map(|(i, store)| store.iter_raw(self.readers[i].alive_bitset())) + .collect(); + for (old_doc_id, reader_ordinal) in doc_id_mapping.iter() { let doc_bytes_it = &mut document_iterators[*reader_ordinal as usize]; if let Some(doc_bytes_res) = doc_bytes_it.next() { @@ -1066,7 +1069,7 @@ impl IndexMerger { } else { debug!("trivial-doc-id-mapping"); for reader in &self.readers { - let store_reader = reader.get_store_reader()?; + let store_reader = reader.get_store_reader(1)?; if reader.has_deletes() // If there is not enough data in the store, we avoid stacking in order to // avoid creating many small blocks in the doc store. Once we have 5 full blocks, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 02fbd56abf..733bc9a089 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -389,6 +389,7 @@ fn remap_and_write( serializer .segment() .open_read(SegmentComponent::TempStore)?, + 50, )?; for old_doc_id in doc_id_map.iter_old_doc_ids() { let doc_bytes = store_read.get_document_bytes(old_doc_id)?; diff --git a/src/query/phrase_query/phrase_query.rs b/src/query/phrase_query/phrase_query.rs index 1996c95f68..ae3916318b 100644 --- a/src/query/phrase_query/phrase_query.rs +++ b/src/query/phrase_query/phrase_query.rs @@ -43,7 +43,12 @@ impl PhraseQuery { /// Creates a new `PhraseQuery` given a list of terms and their offsets. /// /// Can be used to provide custom offset for each term. - pub fn new_with_offset(mut terms: Vec<(usize, Term)>) -> PhraseQuery { + pub fn new_with_offset(terms: Vec<(usize, Term)>) -> PhraseQuery { + PhraseQuery::new_with_offset_and_slop(terms, 0) + } + + /// Creates a new `PhraseQuery` given a list of terms, their offsets and a slop + pub fn new_with_offset_and_slop(mut terms: Vec<(usize, Term)>, slop: u32) -> PhraseQuery { assert!( terms.len() > 1, "A phrase query is required to have strictly more than one term." @@ -57,7 +62,7 @@ impl PhraseQuery { PhraseQuery { field, phrase_terms: terms, - slop: 0, + slop, } } diff --git a/src/query/query_parser/logical_ast.rs b/src/query/query_parser/logical_ast.rs index 9d26c3cd67..2eb75e675a 100644 --- a/src/query/query_parser/logical_ast.rs +++ b/src/query/query_parser/logical_ast.rs @@ -8,7 +8,7 @@ use crate::Score; #[derive(Clone)] pub enum LogicalLiteral { Term(Term), - Phrase(Vec<(usize, Term)>), + Phrase(Vec<(usize, Term)>, u32), Range { field: Field, value_type: Type, @@ -74,7 +74,14 @@ impl fmt::Debug for LogicalLiteral { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { match *self { LogicalLiteral::Term(ref term) => write!(formatter, "{:?}", term), - LogicalLiteral::Phrase(ref terms) => write!(formatter, "\"{:?}\"", terms), + LogicalLiteral::Phrase(ref terms, slop) => { + write!(formatter, "\"{:?}\"", terms)?; + if slop > 0 { + write!(formatter, "~{:?}", slop) + } else { + Ok(()) + } + } LogicalLiteral::Range { ref lower, ref upper, diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index ecd2b11423..74eacc0ef3 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -168,6 +168,9 @@ fn trim_ast(logical_ast: LogicalAst) -> Option { /// It is also possible to define a boost for a some specific field, at the query parser level. /// (See [`set_boost(...)`](#method.set_field_boost) ). Typically you may want to boost a title /// field. +/// +/// Phrase terms support the `~` slop operator which allows to set the phrase's matching +/// distance in words. `"big wolf"~1` will return documents containing the phrase `"big bad wolf"`. #[derive(Clone)] pub struct QueryParser { schema: Schema, @@ -405,6 +408,7 @@ impl QueryParser { field: Field, json_path: &str, phrase: &str, + slop: u32, ) -> Result, QueryParserError> { let field_entry = self.schema.get_field_entry(field); let field_type = field_entry.field_type(); @@ -461,6 +465,7 @@ impl QueryParser { field_name, field, phrase, + slop, &text_analyzer, index_record_option, )? @@ -626,7 +631,9 @@ impl QueryParser { self.compute_path_triplets_for_literal(&literal)?; let mut asts: Vec = Vec::new(); for (field, json_path, phrase) in term_phrases { - for ast in self.compute_logical_ast_for_leaf(field, json_path, phrase)? { + for ast in + self.compute_logical_ast_for_leaf(field, json_path, phrase, literal.slop)? + { // Apply some field specific boost defined at the query parser level. let boost = self.field_boost(field); asts.push(LogicalAst::Leaf(Box::new(ast)).boost(boost)); @@ -670,9 +677,9 @@ impl QueryParser { fn convert_literal_to_query(logical_literal: LogicalLiteral) -> Box { match logical_literal { LogicalLiteral::Term(term) => Box::new(TermQuery::new(term, IndexRecordOption::WithFreqs)), - LogicalLiteral::Phrase(term_with_offsets) => { - Box::new(PhraseQuery::new_with_offset(term_with_offsets)) - } + LogicalLiteral::Phrase(term_with_offsets, slop) => Box::new( + PhraseQuery::new_with_offset_and_slop(term_with_offsets, slop), + ), LogicalLiteral::Range { field, value_type, @@ -689,6 +696,7 @@ fn generate_literals_for_str( field_name: &str, field: Field, phrase: &str, + slop: u32, text_analyzer: &TextAnalyzer, index_record_option: IndexRecordOption, ) -> Result, QueryParserError> { @@ -710,7 +718,7 @@ fn generate_literals_for_str( field_name.to_string(), )); } - Ok(Some(LogicalLiteral::Phrase(terms))) + Ok(Some(LogicalLiteral::Phrase(terms, slop))) } fn generate_literals_for_json_object( @@ -741,7 +749,7 @@ fn generate_literals_for_json_object( field_name.to_string(), )); } - logical_literals.push(LogicalLiteral::Phrase(terms)); + logical_literals.push(LogicalLiteral::Phrase(terms, 0)); Ok(logical_literals) } @@ -1499,4 +1507,23 @@ mod test { assert_eq!(&super::locate_splitting_dots(r#"a\.b.c"#), &[4]); assert_eq!(&super::locate_splitting_dots(r#"a\..b.c"#), &[3, 5]); } + + #[test] + pub fn test_phrase_slop() { + test_parse_query_to_logical_ast_helper( + "\"a b\"~0", + r#"("[(0, Term(type=Str, field=0, "a")), (1, Term(type=Str, field=0, "b"))]" "[(0, Term(type=Str, field=1, "a")), (1, Term(type=Str, field=1, "b"))]")"#, + false, + ); + test_parse_query_to_logical_ast_helper( + "\"a b\"~2", + r#"("[(0, Term(type=Str, field=0, "a")), (1, Term(type=Str, field=0, "b"))]"~2 "[(0, Term(type=Str, field=1, "a")), (1, Term(type=Str, field=1, "b"))]"~2)"#, + false, + ); + test_parse_query_to_logical_ast_helper( + "title:\"a b~4\"~2", + r#""[(0, Term(type=Str, field=0, "a")), (1, Term(type=Str, field=0, "b")), (2, Term(type=Str, field=0, "4"))]"~2"#, + false, + ); + } } diff --git a/src/reader/mod.rs b/src/reader/mod.rs index a61172d3bf..d900f00157 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -13,6 +13,7 @@ use self::pool::Pool; use self::warming::WarmingState; use crate::core::searcher::SearcherGeneration; use crate::directory::{Directory, WatchCallback, WatchHandle, META_LOCK}; +use crate::store::DOCSTORE_CACHE_CAPACITY; use crate::{Index, Inventory, Searcher, SegmentReader, TrackedObject}; /// Defines when a new version of the index should be reloaded. @@ -47,6 +48,7 @@ pub struct IndexReaderBuilder { index: Index, warmers: Vec>, num_warming_threads: usize, + doc_store_cache_size: usize, } impl IndexReaderBuilder { @@ -58,6 +60,7 @@ impl IndexReaderBuilder { index, warmers: Vec::new(), num_warming_threads: 1, + doc_store_cache_size: DOCSTORE_CACHE_CAPACITY, } } @@ -76,6 +79,7 @@ impl IndexReaderBuilder { let inner_reader = InnerIndexReader { index: self.index, num_searchers: self.num_searchers, + doc_store_cache_size: self.doc_store_cache_size, searcher_pool: Pool::new(), warming_state, searcher_generation_counter: Default::default(), @@ -120,6 +124,15 @@ impl IndexReaderBuilder { self } + /// Sets the cache size of the doc store readers. + /// + /// The doc store readers cache by default DOCSTORE_CACHE_CAPACITY(100) decompressed blocks. + #[must_use] + pub fn doc_store_cache_size(mut self, doc_store_cache_size: usize) -> IndexReaderBuilder { + self.doc_store_cache_size = doc_store_cache_size; + self + } + /// Sets the number of [Searcher] to pool. /// /// See [IndexReader::searcher()]. @@ -157,6 +170,7 @@ impl TryInto for IndexReaderBuilder { struct InnerIndexReader { num_searchers: usize, + doc_store_cache_size: usize, index: Index, warming_state: WarmingState, searcher_pool: Pool, @@ -203,6 +217,7 @@ impl InnerIndexReader { self.index.clone(), segment_readers.clone(), searcher_generation.clone(), + self.doc_store_cache_size, ) }) .take(self.num_searchers) diff --git a/src/store/mod.rs b/src/store/mod.rs index 8dd035fe7f..bf57bbc1d0 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -40,7 +40,8 @@ mod reader; mod writer; pub use self::compressors::{Compressor, ZstdCompressor}; pub use self::decompressors::Decompressor; -pub use self::reader::StoreReader; +pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; +pub use self::reader::{CacheStats, StoreReader}; pub use self::writer::StoreWriter; #[cfg(feature = "lz4-compression")] @@ -114,7 +115,7 @@ pub mod tests { let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, 10)?; for i in 0..NUM_DOCS as u32 { assert_eq!( *store @@ -154,7 +155,7 @@ pub mod tests { let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); let field_title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, 10)?; for i in 0..NUM_DOCS as u32 { assert_eq!( *store @@ -231,7 +232,7 @@ pub mod tests { let searcher = index.reader()?.searcher(); let reader = searcher.segment_reader(0); - let store = reader.get_store_reader()?; + let store = reader.get_store_reader(10)?; for doc in store.iter(reader.alive_bitset()) { assert_eq!( *doc?.get_first(text_field).unwrap().as_text().unwrap(), @@ -267,7 +268,7 @@ pub mod tests { } assert_eq!( index.reader().unwrap().searcher().segment_readers()[0] - .get_store_reader() + .get_store_reader(10) .unwrap() .decompressor(), Decompressor::Lz4 @@ -288,7 +289,7 @@ pub mod tests { let searcher = index.reader().unwrap().searcher(); assert_eq!(searcher.segment_readers().len(), 1); let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader().unwrap(); + let store = reader.get_store_reader(10).unwrap(); for doc in store.iter(reader.alive_bitset()).take(50) { assert_eq!( @@ -335,7 +336,7 @@ pub mod tests { let searcher = index.reader()?.searcher(); assert_eq!(searcher.segment_readers().len(), 1); let reader = searcher.segment_readers().iter().last().unwrap(); - let store = reader.get_store_reader()?; + let store = reader.get_store_reader(10)?; assert_eq!(store.block_checkpoints().count(), 1); Ok(()) } @@ -379,7 +380,7 @@ mod bench { 16_384, ); let store_file = directory.open_read(path).unwrap(); - let store = StoreReader::open(store_file).unwrap(); + let store = StoreReader::open(store_file, 10).unwrap(); b.iter(|| store.iter(None).collect::>()); } } diff --git a/src/store/reader.rs b/src/store/reader.rs index 6d30b7613c..62afd4c04a 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -1,4 +1,6 @@ use std::io; +use std::iter::Sum; +use std::ops::AddAssign; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -17,7 +19,7 @@ use crate::space_usage::StoreSpaceUsage; use crate::store::index::Checkpoint; use crate::DocId; -const LRU_CACHE_CAPACITY: usize = 100; +pub(crate) const DOCSTORE_CACHE_CAPACITY: usize = 100; type Block = OwnedBytes; @@ -30,18 +32,13 @@ pub struct StoreReader { cache: BlockCache, } +/// The cache for decompressed blocks. struct BlockCache { cache: Mutex>, cache_hits: Arc, cache_misses: Arc, } -pub struct CacheStats { - pub num_entries: usize, - pub cache_hits: usize, - pub cache_misses: usize, -} - impl BlockCache { fn get_from_cache(&self, pos: usize) -> Option { if let Some(block) = self.cache.lock().unwrap().get(&pos) { @@ -77,9 +74,40 @@ impl BlockCache { } } +#[derive(Debug, Default)] +/// CacheStats for the `StoreReader`. +pub struct CacheStats { + /// The number of entries in the cache + pub num_entries: usize, + /// The number of cache hits. + pub cache_hits: usize, + /// The number of cache misses. + pub cache_misses: usize, +} + +impl AddAssign for CacheStats { + fn add_assign(&mut self, other: Self) { + *self = Self { + num_entries: self.num_entries + other.num_entries, + cache_hits: self.cache_hits + other.cache_hits, + cache_misses: self.cache_misses + other.cache_misses, + }; + } +} + +impl Sum for CacheStats { + fn sum>(mut iter: I) -> Self { + let mut first = iter.next().unwrap_or_default(); + for el in iter { + first += el; + } + first + } +} + impl StoreReader { /// Opens a store reader - pub fn open(store_file: FileSlice) -> io::Result { + pub fn open(store_file: FileSlice, cache_size: usize) -> io::Result { let (footer, data_and_offset) = DocStoreFooter::extract_footer(store_file)?; let (data_file, offset_index_file) = data_and_offset.split(footer.offset as usize); @@ -90,7 +118,7 @@ impl StoreReader { decompressor: footer.decompressor, data: data_file, cache: BlockCache { - cache: Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)), + cache: Mutex::new(LruCache::new(cache_size)), cache_hits: Default::default(), cache_misses: Default::default(), }, @@ -368,7 +396,7 @@ mod tests { let schema = write_lorem_ipsum_store(writer, 500, Compressor::default(), BLOCK_SIZE); let title = schema.get_field("title").unwrap(); let store_file = directory.open_read(path)?; - let store = StoreReader::open(store_file)?; + let store = StoreReader::open(store_file, DOCSTORE_CACHE_CAPACITY)?; assert_eq!(store.cache.len(), 0); assert_eq!(store.cache_stats().cache_hits, 0);