diff --git a/CHANGELOG.md b/CHANGELOG.md index b0c9a47bba..f03bf345ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +Tantivy 0.19 +================================ +- Updated [Date Field Type](https://github.com/quickwit-oss/tantivy/pull/1396) + The `DateTime` type has been updated to hold timestamps with microseconds precision. + `DateOptions` and `DatePrecision` have been added to configure Date fields. The precision is used to hint on fast values compression. Otherwise, seconds precision is used everywhere else (i.e terms, indexing). + + Tantivy 0.18 ================================ - For date values `chrono` has been replaced with `time` (@uklotzde) #1304 : diff --git a/Cargo.toml b/Cargo.toml index 4899dd119d..dcb01229fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ thiserror = "1.0.30" htmlescape = "0.3.1" fail = "0.5.0" murmurhash32 = "0.2.0" -time = { version = "0.3.9", features = ["serde-well-known"] } +time = { version = "0.3.10", features = ["serde-well-known"] } smallvec = "1.8.0" rayon = "1.5.2" lru = "0.7.5" diff --git a/README.md b/README.md index ab02dd7f2c..97045531a3 100644 --- a/README.md +++ b/README.md @@ -152,4 +152,13 @@ You can also find other bindings on [GitHub](https://github.com/search?q=tantivy - and [more](https://github.com/search?q=tantivy)! ### On average, how much faster is Tantivy compared to Lucene? -- According to our [search latency benchmark](https://tantivy-search.github.io/bench/), Tantivy is approximately 2x faster than Lucene. \ No newline at end of file +- According to our [search latency benchmark](https://tantivy-search.github.io/bench/), Tantivy is approximately 2x faster than Lucene. + +### Does tantivy support incremental indexing? +- Yes. + +### How can I edit documents? +- Data in tantivy is immutable. To edit a document, the document needs to be deleted and reindexed. + +### When will my documents be searchable during indexing? +- Documents will be searchable after a `commit` is called on an `IndexWriter`. Existing `IndexReader`s will also need to be reloaded in order to reflect the changes. Finally, changes are only visible to newly acquired `Searcher`. diff --git a/common/src/writer.rs b/common/src/writer.rs index 20f56221d7..9b8b86908d 100644 --- a/common/src/writer.rs +++ b/common/src/writer.rs @@ -62,7 +62,7 @@ impl TerminatingWrite for CountingWriter { pub struct AntiCallToken(()); /// Trait used to indicate when no more write need to be done on a writer -pub trait TerminatingWrite: Write { +pub trait TerminatingWrite: Write + Send { /// Indicate that the writer will no longer be used. Internally call terminate_ref. fn terminate(mut self) -> io::Result<()> where Self: Sized { diff --git a/examples/aggregation.rs b/examples/aggregation.rs index 82cc0fccd3..ae11dc5a5a 100644 --- a/examples/aggregation.rs +++ b/examples/aggregation.rs @@ -117,7 +117,7 @@ fn main() -> tantivy::Result<()> { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); diff --git a/examples/date_time_field.rs b/examples/date_time_field.rs new file mode 100644 index 0000000000..4381ed34cb --- /dev/null +++ b/examples/date_time_field.rs @@ -0,0 +1,69 @@ +// # DateTime field example +// +// This example shows how the DateTime field can be used + +use tantivy::collector::TopDocs; +use tantivy::query::QueryParser; +use tantivy::schema::{Cardinality, DateOptions, Schema, Value, INDEXED, STORED, STRING}; +use tantivy::Index; + +fn main() -> tantivy::Result<()> { + // # Defining the schema + let mut schema_builder = Schema::builder(); + let opts = DateOptions::from(INDEXED) + .set_stored() + .set_fast(Cardinality::SingleValue) + .set_precision(tantivy::DatePrecision::Seconds); + let occurred_at = schema_builder.add_date_field("occurred_at", opts); + let event_type = schema_builder.add_text_field("event", STRING | STORED); + let schema = schema_builder.build(); + + // # Indexing documents + let index = Index::create_in_ram(schema.clone()); + + let mut index_writer = index.writer(50_000_000)?; + let doc = schema.parse_document( + r#"{ + "occurred_at": "2022-06-22T12:53:50.53Z", + "event": "pull-request" + }"#, + )?; + index_writer.add_document(doc)?; + let doc = schema.parse_document( + r#"{ + "occurred_at": "2022-06-22T13:00:00.22Z", + "event": "comment" + }"#, + )?; + index_writer.add_document(doc)?; + index_writer.commit()?; + + let reader = index.reader()?; + let searcher = reader.searcher(); + + // # Default fields: event_type + let query_parser = QueryParser::for_index(&index, vec![event_type]); + { + let query = query_parser.parse_query("event:comment")?; + let count_docs = searcher.search(&*query, &TopDocs::with_limit(5))?; + assert_eq!(count_docs.len(), 1); + } + { + let query = query_parser + .parse_query(r#"occurred_at:[2022-06-22T12:58:00Z TO 2022-06-23T00:00:00Z}"#)?; + let count_docs = searcher.search(&*query, &TopDocs::with_limit(4))?; + assert_eq!(count_docs.len(), 1); + for (_score, doc_address) in count_docs { + let retrieved_doc = searcher.doc(doc_address)?; + assert!(matches!( + retrieved_doc.get_first(occurred_at), + Some(Value::Date(_)) + )); + assert_eq!( + schema.to_json(&retrieved_doc), + r#"{"event":["comment"],"occurred_at":["2022-06-22T13:00:00.22Z"]}"# + ); + } + } + Ok(()) +} diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index e09f733035..5509a78a7a 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -14,7 +14,7 @@ pub struct BitpackedFastFieldReader { pub max_value_u64: u64, } -impl<'data> FastFieldCodecReader for BitpackedFastFieldReader { +impl FastFieldCodecReader for BitpackedFastFieldReader { /// Opens a fast field given a file. fn open_from_bytes(bytes: &[u8]) -> io::Result { let (_data, mut footer) = bytes.split_at(bytes.len() - 16); diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index 8ed82ac5c6..491faf2137 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -1,10 +1,13 @@ //! This will enhance the request tree with access to the fastfield and metadata. +use std::rc::Rc; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; use super::bucket::{HistogramAggregation, RangeAggregation, TermsAggregation}; use super::metric::{AverageAggregation, StatsAggregation}; +use super::segment_agg_result::BucketCount; use super::VecWithNames; use crate::fastfield::{ type_and_cardinality, DynamicFastFieldReader, FastType, MultiValuedFastFieldReader, @@ -60,6 +63,7 @@ pub struct BucketAggregationWithAccessor { pub(crate) field_type: Type, pub(crate) bucket_agg: BucketAggregationType, pub(crate) sub_aggregation: AggregationsWithAccessor, + pub(crate) bucket_count: BucketCount, } impl BucketAggregationWithAccessor { @@ -67,6 +71,8 @@ impl BucketAggregationWithAccessor { bucket: &BucketAggregationType, sub_aggregation: &Aggregations, reader: &SegmentReader, + bucket_count: Rc, + max_bucket_count: u32, ) -> crate::Result { let mut inverted_index = None; let (accessor, field_type) = match &bucket { @@ -92,9 +98,18 @@ impl BucketAggregationWithAccessor { Ok(BucketAggregationWithAccessor { accessor, field_type, - sub_aggregation: get_aggs_with_accessor_and_validate(&sub_aggregation, reader)?, + sub_aggregation: get_aggs_with_accessor_and_validate( + &sub_aggregation, + reader, + bucket_count.clone(), + max_bucket_count, + )?, bucket_agg: bucket.clone(), inverted_index, + bucket_count: BucketCount { + bucket_count, + max_bucket_count, + }, }) } } @@ -134,6 +149,8 @@ impl MetricAggregationWithAccessor { pub(crate) fn get_aggs_with_accessor_and_validate( aggs: &Aggregations, reader: &SegmentReader, + bucket_count: Rc, + max_bucket_count: u32, ) -> crate::Result { let mut metrics = vec![]; let mut buckets = vec![]; @@ -145,6 +162,8 @@ pub(crate) fn get_aggs_with_accessor_and_validate( &bucket.bucket_agg, &bucket.sub_aggregation, reader, + Rc::clone(&bucket_count), + max_bucket_count, )?, )), Aggregation::Metric(metric) => metrics.push(( diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 3dba73d1a6..fc614990b4 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -4,21 +4,15 @@ //! intermediate average results, which is the sum and the number of values. The actual average is //! calculated on the step from intermediate to final aggregation result tree. -use std::cmp::Ordering; use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use super::agg_req::{ - Aggregations, AggregationsInternal, BucketAggregationInternal, MetricAggregation, -}; -use super::bucket::{intermediate_buckets_to_final_buckets, GetDocCount}; -use super::intermediate_agg_result::{ - IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, - IntermediateMetricResult, IntermediateRangeBucketEntry, -}; +use super::agg_req::BucketAggregationInternal; +use super::bucket::GetDocCount; +use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult}; use super::metric::{SingleMetricResult, Stats}; -use super::{Key, VecWithNames}; +use super::Key; use crate::TantivyError; #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] @@ -41,98 +35,6 @@ impl AggregationResults { ))) } } - - /// Convert and intermediate result and its aggregation request to the final result - pub fn from_intermediate_and_req( - results: IntermediateAggregationResults, - agg: Aggregations, - ) -> crate::Result { - AggregationResults::from_intermediate_and_req_internal(results, &(agg.into())) - } - - /// Convert and intermediate result and its aggregation request to the final result - /// - /// Internal function, CollectorAggregations is used instead Aggregations, which is optimized - /// for internal processing, by splitting metric and buckets into seperate groups. - pub(crate) fn from_intermediate_and_req_internal( - intermediate_results: IntermediateAggregationResults, - req: &AggregationsInternal, - ) -> crate::Result { - // Important assumption: - // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the - // request - let mut results: HashMap = HashMap::new(); - - if let Some(buckets) = intermediate_results.buckets { - add_coverted_final_buckets_to_result(&mut results, buckets, &req.buckets)? - } else { - // When there are no buckets, we create empty buckets, so that the serialized json - // format is constant - add_empty_final_buckets_to_result(&mut results, &req.buckets)? - }; - - if let Some(metrics) = intermediate_results.metrics { - add_converted_final_metrics_to_result(&mut results, metrics); - } else { - // When there are no metrics, we create empty metric results, so that the serialized - // json format is constant - add_empty_final_metrics_to_result(&mut results, &req.metrics)?; - } - Ok(Self(results)) - } -} - -fn add_converted_final_metrics_to_result( - results: &mut HashMap, - metrics: VecWithNames, -) { - results.extend( - metrics - .into_iter() - .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), - ); -} - -fn add_empty_final_metrics_to_result( - results: &mut HashMap, - req_metrics: &VecWithNames, -) -> crate::Result<()> { - results.extend(req_metrics.iter().map(|(key, req)| { - let empty_bucket = IntermediateMetricResult::empty_from_req(req); - ( - key.to_string(), - AggregationResult::MetricResult(empty_bucket.into()), - ) - })); - Ok(()) -} - -fn add_empty_final_buckets_to_result( - results: &mut HashMap, - req_buckets: &VecWithNames, -) -> crate::Result<()> { - let requested_buckets = req_buckets.iter(); - for (key, req) in requested_buckets { - let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?); - results.insert(key.to_string(), empty_bucket); - } - Ok(()) -} - -fn add_coverted_final_buckets_to_result( - results: &mut HashMap, - buckets: VecWithNames, - req_buckets: &VecWithNames, -) -> crate::Result<()> { - assert_eq!(buckets.len(), req_buckets.len()); - - let buckets_with_request = buckets.into_iter().zip(req_buckets.values()); - for ((key, bucket), req) in buckets_with_request { - let result = - AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(bucket, req)?); - results.insert(key, result); - } - Ok(()) } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -154,7 +56,8 @@ impl AggregationResult { match self { AggregationResult::BucketResult(_bucket) => Err(TantivyError::InternalError( "Tried to retrieve value from bucket aggregation. This is not supported and \ - should not happen during collection, but should be catched during validation" + should not happen during collection phase, but should be catched during \ + validation" .to_string(), )), AggregationResult::MetricResult(metric) => metric.get_value(agg_property), @@ -230,48 +133,7 @@ pub enum BucketResult { impl BucketResult { pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result { let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg); - BucketResult::from_intermediate_and_req(empty_bucket, req) - } - - fn from_intermediate_and_req( - bucket_result: IntermediateBucketResult, - req: &BucketAggregationInternal, - ) -> crate::Result { - match bucket_result { - IntermediateBucketResult::Range(range_res) => { - let mut buckets: Vec = range_res - .buckets - .into_iter() - .map(|(_, bucket)| { - RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation) - }) - .collect::>>()?; - - buckets.sort_by(|left, right| { - // TODO use total_cmp next stable rust release - left.from - .unwrap_or(f64::MIN) - .partial_cmp(&right.from.unwrap_or(f64::MIN)) - .unwrap_or(Ordering::Equal) - }); - Ok(BucketResult::Range { buckets }) - } - IntermediateBucketResult::Histogram { buckets } => { - let buckets = intermediate_buckets_to_final_buckets( - buckets, - req.as_histogram() - .expect("unexpected aggregation, expected histogram aggregation"), - &req.sub_aggregation, - )?; - - Ok(BucketResult::Histogram { buckets }) - } - IntermediateBucketResult::Terms(terms) => terms.into_final_result( - req.as_term() - .expect("unexpected aggregation, expected term aggregation"), - &req.sub_aggregation, - ), - } + empty_bucket.into_final_bucket_result(req) } } @@ -311,22 +173,6 @@ pub struct BucketEntry { /// Sub-aggregations in this bucket. pub sub_aggregation: AggregationResults, } - -impl BucketEntry { - pub(crate) fn from_intermediate_and_req( - entry: IntermediateHistogramBucketEntry, - req: &AggregationsInternal, - ) -> crate::Result { - Ok(BucketEntry { - key: Key::F64(entry.key), - doc_count: entry.doc_count, - sub_aggregation: AggregationResults::from_intermediate_and_req_internal( - entry.sub_aggregation, - req, - )?, - }) - } -} impl GetDocCount for &BucketEntry { fn doc_count(&self) -> u64 { self.doc_count @@ -384,21 +230,3 @@ pub struct RangeBucketEntry { #[serde(skip_serializing_if = "Option::is_none")] pub to: Option, } - -impl RangeBucketEntry { - fn from_intermediate_and_req( - entry: IntermediateRangeBucketEntry, - req: &AggregationsInternal, - ) -> crate::Result { - Ok(RangeBucketEntry { - key: entry.key, - doc_count: entry.doc_count, - sub_aggregation: AggregationResults::from_intermediate_and_req_internal( - entry.sub_aggregation, - req, - )?, - to: entry.to, - from: entry.from, - }) - } -} diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index a2a4a87e59..70acf0f117 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -250,6 +250,11 @@ impl SegmentHistogramCollector { ); }; + agg_with_accessor + .bucket_count + .add_count(buckets.len() as u32); + agg_with_accessor.bucket_count.validate_bucket_count()?; + Ok(IntermediateBucketResult::Histogram { buckets }) } @@ -311,7 +316,7 @@ impl SegmentHistogramCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { let bounds = self.bounds; let interval = self.interval; let offset = self.offset; @@ -341,28 +346,28 @@ impl SegmentHistogramCollector { bucket_pos0, docs[0], &bucket_with_accessor.sub_aggregation, - ); + )?; self.increment_bucket_if_in_bounds( val1, &bounds, bucket_pos1, docs[1], &bucket_with_accessor.sub_aggregation, - ); + )?; self.increment_bucket_if_in_bounds( val2, &bounds, bucket_pos2, docs[2], &bucket_with_accessor.sub_aggregation, - ); + )?; self.increment_bucket_if_in_bounds( val3, &bounds, bucket_pos3, docs[3], &bucket_with_accessor.sub_aggregation, - ); + )?; } for doc in iter.remainder() { let val = f64_from_fastfield_u64(accessor.get(*doc), &self.field_type); @@ -376,16 +381,17 @@ impl SegmentHistogramCollector { self.buckets[bucket_pos].key, get_bucket_val(val, self.interval, self.offset) as f64 ); - self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation); + self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?; } if force_flush { if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { for sub_aggregation in sub_aggregations { sub_aggregation - .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush); + .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?; } } } + Ok(()) } #[inline] @@ -396,15 +402,16 @@ impl SegmentHistogramCollector { bucket_pos: usize, doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { if bounds.contains(val) { debug_assert_eq!( self.buckets[bucket_pos].key, get_bucket_val(val, self.interval, self.offset) as f64 ); - self.increment_bucket(bucket_pos, doc, bucket_with_accessor); + self.increment_bucket(bucket_pos, doc, bucket_with_accessor)?; } + Ok(()) } #[inline] @@ -413,12 +420,13 @@ impl SegmentHistogramCollector { bucket_pos: usize, doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { let bucket = &mut self.buckets[bucket_pos]; bucket.doc_count += 1; if let Some(sub_aggregation) = self.sub_aggregations.as_mut() { - (&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor); + (&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor)?; } + Ok(()) } fn f64_from_fastfield_u64(&self, val: u64) -> f64 { @@ -482,14 +490,12 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( sub_aggregation: empty_sub_aggregation.clone(), }, }) - .map(|intermediate_bucket| { - BucketEntry::from_intermediate_and_req(intermediate_bucket, sub_aggregation) - }) + .map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation)) .collect::>>() } // Convert to BucketEntry -pub(crate) fn intermediate_buckets_to_final_buckets( +pub(crate) fn intermediate_histogram_buckets_to_final_buckets( buckets: Vec, histogram_req: &HistogramAggregation, sub_aggregation: &AggregationsInternal, @@ -503,8 +509,8 @@ pub(crate) fn intermediate_buckets_to_final_buckets( } else { buckets .into_iter() - .filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count()) - .map(|bucket| BucketEntry::from_intermediate_and_req(bucket, sub_aggregation)) + .filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count()) + .map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation)) .collect::>>() } } @@ -546,7 +552,7 @@ pub(crate) fn generate_buckets_with_opt_minmax( let offset = req.offset.unwrap_or(0.0); let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64; let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64; - let mut buckets = vec![]; + let mut buckets = Vec::with_capacity((first_bucket_num..=last_bucket_num).count()); for bucket_pos in first_bucket_num..=last_bucket_num { let bucket_key = bucket_pos as f64 * req.interval + offset; buckets.push(bucket_key); diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index d570e96a37..7faa500e7c 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::ops::Range; +use fnv::FnvHashMap; use serde::{Deserialize, Serialize}; use crate::aggregation::agg_req_with_accessor::{ @@ -9,8 +10,8 @@ use crate::aggregation::agg_req_with_accessor::{ use crate::aggregation::intermediate_agg_result::{ IntermediateBucketResult, IntermediateRangeBucketEntry, IntermediateRangeBucketResult, }; -use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector; -use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, Key}; +use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector}; +use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, Key, SerializedKey}; use crate::fastfield::FastFieldReader; use crate::schema::Type; use crate::{DocId, TantivyError}; @@ -153,7 +154,7 @@ impl SegmentRangeCollector { ) -> crate::Result { let field_type = self.field_type; - let buckets = self + let buckets: FnvHashMap = self .buckets .into_iter() .map(move |range_bucket| { @@ -174,12 +175,13 @@ impl SegmentRangeCollector { pub(crate) fn from_req_and_validate( req: &RangeAggregation, sub_aggregation: &AggregationsWithAccessor, + bucket_count: &BucketCount, field_type: Type, ) -> crate::Result { // The range input on the request is f64. // We need to convert to u64 ranges, because we read the values as u64. // The mapping from the conversion is monotonic so ordering is preserved. - let buckets = extend_validate_ranges(&req.ranges, &field_type)? + let buckets: Vec<_> = extend_validate_ranges(&req.ranges, &field_type)? .iter() .map(|range| { let to = if range.end == u64::MAX { @@ -212,6 +214,9 @@ impl SegmentRangeCollector { }) .collect::>()?; + bucket_count.add_count(buckets.len() as u32); + bucket_count.validate_bucket_count()?; + Ok(SegmentRangeCollector { buckets, field_type, @@ -224,7 +229,7 @@ impl SegmentRangeCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { let mut iter = doc.chunks_exact(4); let accessor = bucket_with_accessor .accessor @@ -240,24 +245,25 @@ impl SegmentRangeCollector { let bucket_pos3 = self.get_bucket_pos(val3); let bucket_pos4 = self.get_bucket_pos(val4); - self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation); - self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation); - self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation); - self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation); + self.increment_bucket(bucket_pos1, docs[0], &bucket_with_accessor.sub_aggregation)?; + self.increment_bucket(bucket_pos2, docs[1], &bucket_with_accessor.sub_aggregation)?; + self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation)?; + self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation)?; } for doc in iter.remainder() { let val = accessor.get(*doc); let bucket_pos = self.get_bucket_pos(val); - self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation); + self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?; } if force_flush { for bucket in &mut self.buckets { if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { sub_aggregation - .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush); + .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush)?; } } } + Ok(()) } #[inline] @@ -266,13 +272,14 @@ impl SegmentRangeCollector { bucket_pos: usize, doc: DocId, bucket_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { let bucket = &mut self.buckets[bucket_pos]; bucket.bucket.doc_count += 1; if let Some(sub_aggregation) = &mut bucket.bucket.sub_aggregation { - sub_aggregation.collect(doc, bucket_with_accessor); + sub_aggregation.collect(doc, bucket_with_accessor)?; } + Ok(()) } #[inline] @@ -317,7 +324,7 @@ fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> crate::Resu } /// Extends the provided buckets to contain the whole value range, by inserting buckets at the -/// beginning and end. +/// beginning and end and filling gaps. fn extend_validate_ranges( buckets: &[RangeAggregationRange], field_type: &Type, @@ -401,8 +408,13 @@ mod tests { ranges, }; - SegmentRangeCollector::from_req_and_validate(&req, &Default::default(), field_type) - .expect("unexpected error") + SegmentRangeCollector::from_req_and_validate( + &req, + &Default::default(), + &Default::default(), + field_type, + ) + .expect("unexpected error") } #[test] @@ -422,7 +434,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req); + let collector = AggregationCollector::from_aggs(agg_req, None); let reader = index.reader()?; let searcher = reader.searcher(); diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index c9833c8853..8a9970e0fd 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -11,7 +11,7 @@ use crate::aggregation::agg_req_with_accessor::{ use crate::aggregation::intermediate_agg_result::{ IntermediateBucketResult, IntermediateTermBucketEntry, IntermediateTermBucketResult, }; -use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector; +use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector}; use crate::error::DataCorruption; use crate::fastfield::MultiValuedFastFieldReader; use crate::schema::Type; @@ -244,28 +244,33 @@ impl TermBuckets { &mut self, term_ids: &[u64], doc: DocId, - bucket_with_accessor: &AggregationsWithAccessor, + sub_aggregation: &AggregationsWithAccessor, + bucket_count: &BucketCount, blueprint: &Option, - ) { - // self.ensure_vec_exists(term_ids); + ) -> crate::Result<()> { for &term_id in term_ids { - let entry = self - .entries - .entry(term_id as u32) - .or_insert_with(|| TermBucketEntry::from_blueprint(blueprint)); + let entry = self.entries.entry(term_id as u32).or_insert_with(|| { + bucket_count.add_count(1); + + TermBucketEntry::from_blueprint(blueprint) + }); entry.doc_count += 1; if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.collect(doc, bucket_with_accessor); + sub_aggregations.collect(doc, sub_aggregation)?; } } + bucket_count.validate_bucket_count()?; + + Ok(()) } - fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) { + fn force_flush(&mut self, agg_with_accessor: &AggregationsWithAccessor) -> crate::Result<()> { for entry in &mut self.entries.values_mut() { if let Some(sub_aggregations) = entry.sub_aggregations.as_mut() { - sub_aggregations.flush_staged_docs(agg_with_accessor, false); + sub_aggregations.flush_staged_docs(agg_with_accessor, false)?; } } + Ok(()) } } @@ -421,7 +426,7 @@ impl SegmentTermCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { let accessor = bucket_with_accessor .accessor .as_multi() @@ -441,26 +446,30 @@ impl SegmentTermCollector { &vals1, docs[0], &bucket_with_accessor.sub_aggregation, + &bucket_with_accessor.bucket_count, &self.blueprint, - ); + )?; self.term_buckets.increment_bucket( &vals2, docs[1], &bucket_with_accessor.sub_aggregation, + &bucket_with_accessor.bucket_count, &self.blueprint, - ); + )?; self.term_buckets.increment_bucket( &vals3, docs[2], &bucket_with_accessor.sub_aggregation, + &bucket_with_accessor.bucket_count, &self.blueprint, - ); + )?; self.term_buckets.increment_bucket( &vals4, docs[3], &bucket_with_accessor.sub_aggregation, + &bucket_with_accessor.bucket_count, &self.blueprint, - ); + )?; } for &doc in iter.remainder() { accessor.get_vals(doc, &mut vals1); @@ -469,13 +478,15 @@ impl SegmentTermCollector { &vals1, doc, &bucket_with_accessor.sub_aggregation, + &bucket_with_accessor.bucket_count, &self.blueprint, - ); + )?; } if force_flush { self.term_buckets - .force_flush(&bucket_with_accessor.sub_aggregation); + .force_flush(&bucket_with_accessor.sub_aggregation)?; } + Ok(()) } } @@ -1173,6 +1184,33 @@ mod tests { Ok(()) } + #[test] + fn terms_aggregation_term_bucket_limit() -> crate::Result<()> { + let terms: Vec = (0..100_000).map(|el| el.to_string()).collect(); + let terms_per_segment = vec![terms.iter().map(|el| el.as_str()).collect()]; + + let index = get_test_index_from_terms(true, &terms_per_segment)?; + + let agg_req: Aggregations = vec![( + "my_texts".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Terms(TermsAggregation { + field: "string_id".to_string(), + min_doc_count: Some(0), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request_with_query(agg_req, &index, None); + assert!(res.is_err()); + + Ok(()) + } + #[test] fn test_json_format() -> crate::Result<()> { let agg_req: Aggregations = vec![( @@ -1291,9 +1329,15 @@ mod bench { let mut collector = get_collector_with_buckets(total_terms); let vals = get_rand_terms(total_terms, num_terms); let aggregations_with_accessor: AggregationsWithAccessor = Default::default(); + let bucket_count: BucketCount = BucketCount { + bucket_count: Default::default(), + max_bucket_count: 1_000_001u32, + }; b.iter(|| { for &val in &vals { - collector.increment_bucket(&[val], 0, &aggregations_with_accessor, &None); + collector + .increment_bucket(&[val], 0, &aggregations_with_accessor, &bucket_count, &None) + .unwrap(); } }) } diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index f35a5e3e17..c9510d9263 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -1,3 +1,5 @@ +use std::rc::Rc; + use super::agg_req::Aggregations; use super::agg_req_with_accessor::AggregationsWithAccessor; use super::agg_result::AggregationResults; @@ -5,19 +7,27 @@ use super::intermediate_agg_result::IntermediateAggregationResults; use super::segment_agg_result::SegmentAggregationResultsCollector; use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate; use crate::collector::{Collector, SegmentCollector}; -use crate::SegmentReader; +use crate::{SegmentReader, TantivyError}; + +pub const MAX_BUCKET_COUNT: u32 = 65000; /// Collector for aggregations. /// /// The collector collects all aggregations by the underlying aggregation request. pub struct AggregationCollector { agg: Aggregations, + max_bucket_count: u32, } impl AggregationCollector { /// Create collector from aggregation request. - pub fn from_aggs(agg: Aggregations) -> Self { - Self { agg } + /// + /// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset + pub fn from_aggs(agg: Aggregations, max_bucket_count: Option) -> Self { + Self { + agg, + max_bucket_count: max_bucket_count.unwrap_or(MAX_BUCKET_COUNT), + } } } @@ -28,15 +38,21 @@ impl AggregationCollector { /// # Purpose /// AggregationCollector returns `IntermediateAggregationResults` and not the final /// `AggregationResults`, so that results from differenct indices can be merged and then converted -/// into the final `AggregationResults` via the `into()` method. +/// into the final `AggregationResults` via the `into_final_result()` method. pub struct DistributedAggregationCollector { agg: Aggregations, + max_bucket_count: u32, } impl DistributedAggregationCollector { /// Create collector from aggregation request. - pub fn from_aggs(agg: Aggregations) -> Self { - Self { agg } + /// + /// max_bucket_count will default to `MAX_BUCKET_COUNT` (65000) when unset + pub fn from_aggs(agg: Aggregations, max_bucket_count: Option) -> Self { + Self { + agg, + max_bucket_count: max_bucket_count.unwrap_or(MAX_BUCKET_COUNT), + } } } @@ -50,7 +66,11 @@ impl Collector for DistributedAggregationCollector { _segment_local_id: crate::SegmentOrdinal, reader: &crate::SegmentReader, ) -> crate::Result { - AggregationSegmentCollector::from_agg_req_and_reader(&self.agg, reader) + AggregationSegmentCollector::from_agg_req_and_reader( + &self.agg, + reader, + self.max_bucket_count, + ) } fn requires_scoring(&self) -> bool { @@ -75,7 +95,11 @@ impl Collector for AggregationCollector { _segment_local_id: crate::SegmentOrdinal, reader: &crate::SegmentReader, ) -> crate::Result { - AggregationSegmentCollector::from_agg_req_and_reader(&self.agg, reader) + AggregationSegmentCollector::from_agg_req_and_reader( + &self.agg, + reader, + self.max_bucket_count, + ) } fn requires_scoring(&self) -> bool { @@ -87,7 +111,7 @@ impl Collector for AggregationCollector { segment_fruits: Vec<::Fruit>, ) -> crate::Result { let res = merge_fruits(segment_fruits)?; - AggregationResults::from_intermediate_and_req(res, self.agg.clone()) + res.into_final_bucket_result(self.agg.clone()) } } @@ -109,6 +133,7 @@ fn merge_fruits( pub struct AggregationSegmentCollector { aggs_with_accessor: AggregationsWithAccessor, result: SegmentAggregationResultsCollector, + error: Option, } impl AggregationSegmentCollector { @@ -117,13 +142,16 @@ impl AggregationSegmentCollector { pub fn from_agg_req_and_reader( agg: &Aggregations, reader: &SegmentReader, + max_bucket_count: u32, ) -> crate::Result { - let aggs_with_accessor = get_aggs_with_accessor_and_validate(agg, reader)?; + let aggs_with_accessor = + get_aggs_with_accessor_and_validate(agg, reader, Rc::default(), max_bucket_count)?; let result = SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?; Ok(AggregationSegmentCollector { aggs_with_accessor, result, + error: None, }) } } @@ -133,12 +161,20 @@ impl SegmentCollector for AggregationSegmentCollector { #[inline] fn collect(&mut self, doc: crate::DocId, _score: crate::Score) { - self.result.collect(doc, &self.aggs_with_accessor); + if self.error.is_some() { + return; + } + if let Err(err) = self.result.collect(doc, &self.aggs_with_accessor) { + self.error = Some(err); + } } fn harvest(mut self) -> Self::Fruit { + if let Some(err) = self.error { + return Err(err); + } self.result - .flush_staged_docs(&self.aggs_with_accessor, true); + .flush_staged_docs(&self.aggs_with_accessor, true)?; self.result .into_intermediate_aggregations_result(&self.aggs_with_accessor) } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 9bde00707e..20eef59c07 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -3,16 +3,20 @@ //! indices. use std::cmp::Ordering; +use std::collections::HashMap; use fnv::FnvHashMap; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use super::agg_req::{AggregationsInternal, BucketAggregationType, MetricAggregation}; -use super::agg_result::BucketResult; +use super::agg_req::{ + Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType, + MetricAggregation, +}; +use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry}; use super::bucket::{ - cut_off_buckets, get_agg_name_and_property, GetDocCount, Order, OrderTarget, - SegmentHistogramBucketEntry, TermsAggregation, + cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets, + GetDocCount, Order, OrderTarget, SegmentHistogramBucketEntry, TermsAggregation, }; use super::metric::{IntermediateAverage, IntermediateStats}; use super::segment_agg_result::SegmentMetricResultCollector; @@ -31,6 +35,46 @@ pub struct IntermediateAggregationResults { } impl IntermediateAggregationResults { + /// Convert intermediate result and its aggregation request to the final result. + pub(crate) fn into_final_bucket_result( + self, + req: Aggregations, + ) -> crate::Result { + self.into_final_bucket_result_internal(&(req.into())) + } + + /// Convert intermediate result and its aggregation request to the final result. + /// + /// Internal function, AggregationsInternal is used instead Aggregations, which is optimized + /// for internal processing, by splitting metric and buckets into seperate groups. + pub(crate) fn into_final_bucket_result_internal( + self, + req: &AggregationsInternal, + ) -> crate::Result { + // Important assumption: + // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the + // request + let mut results: HashMap = HashMap::new(); + + if let Some(buckets) = self.buckets { + convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets)? + } else { + // When there are no buckets, we create empty buckets, so that the serialized json + // format is constant + add_empty_final_buckets_to_result(&mut results, &req.buckets)? + }; + + if let Some(metrics) = self.metrics { + convert_and_add_final_metrics_to_result(&mut results, metrics); + } else { + // When there are no metrics, we create empty metric results, so that the serialized + // json format is constant + add_empty_final_metrics_to_result(&mut results, &req.metrics)?; + } + + Ok(AggregationResults(results)) + } + pub(crate) fn empty_from_req(req: &AggregationsInternal) -> Self { let metrics = if req.metrics.is_empty() { None @@ -90,6 +134,58 @@ impl IntermediateAggregationResults { } } +fn convert_and_add_final_metrics_to_result( + results: &mut HashMap, + metrics: VecWithNames, +) { + results.extend( + metrics + .into_iter() + .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), + ); +} + +fn add_empty_final_metrics_to_result( + results: &mut HashMap, + req_metrics: &VecWithNames, +) -> crate::Result<()> { + results.extend(req_metrics.iter().map(|(key, req)| { + let empty_bucket = IntermediateMetricResult::empty_from_req(req); + ( + key.to_string(), + AggregationResult::MetricResult(empty_bucket.into()), + ) + })); + Ok(()) +} + +fn add_empty_final_buckets_to_result( + results: &mut HashMap, + req_buckets: &VecWithNames, +) -> crate::Result<()> { + let requested_buckets = req_buckets.iter(); + for (key, req) in requested_buckets { + let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?); + results.insert(key.to_string(), empty_bucket); + } + Ok(()) +} + +fn convert_and_add_final_buckets_to_result( + results: &mut HashMap, + buckets: VecWithNames, + req_buckets: &VecWithNames, +) -> crate::Result<()> { + assert_eq!(buckets.len(), req_buckets.len()); + + let buckets_with_request = buckets.into_iter().zip(req_buckets.values()); + for ((key, bucket), req) in buckets_with_request { + let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req)?); + results.insert(key, result); + } + Ok(()) +} + /// An aggregation is either a bucket or a metric. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum IntermediateAggregationResult { @@ -171,6 +267,45 @@ pub enum IntermediateBucketResult { } impl IntermediateBucketResult { + pub(crate) fn into_final_bucket_result( + self, + req: &BucketAggregationInternal, + ) -> crate::Result { + match self { + IntermediateBucketResult::Range(range_res) => { + let mut buckets: Vec = range_res + .buckets + .into_iter() + .map(|(_, bucket)| bucket.into_final_bucket_entry(&req.sub_aggregation)) + .collect::>>()?; + + buckets.sort_by(|left, right| { + // TODO use total_cmp next stable rust release + left.from + .unwrap_or(f64::MIN) + .partial_cmp(&right.from.unwrap_or(f64::MIN)) + .unwrap_or(Ordering::Equal) + }); + Ok(BucketResult::Range { buckets }) + } + IntermediateBucketResult::Histogram { buckets } => { + let buckets = intermediate_histogram_buckets_to_final_buckets( + buckets, + req.as_histogram() + .expect("unexpected aggregation, expected histogram aggregation"), + &req.sub_aggregation, + )?; + + Ok(BucketResult::Histogram { buckets }) + } + IntermediateBucketResult::Terms(terms) => terms.into_final_result( + req.as_term() + .expect("unexpected aggregation, expected term aggregation"), + &req.sub_aggregation, + ), + } + } + pub(crate) fn empty_from_req(req: &BucketAggregationType) -> Self { match req { BucketAggregationType::Terms(_) => IntermediateBucketResult::Terms(Default::default()), @@ -267,10 +402,9 @@ impl IntermediateTermBucketResult { Ok(BucketEntry { key: Key::Str(key), doc_count: entry.doc_count, - sub_aggregation: AggregationResults::from_intermediate_and_req_internal( - entry.sub_aggregation, - sub_aggregation_req, - )?, + sub_aggregation: entry + .sub_aggregation + .into_final_bucket_result_internal(sub_aggregation_req)?, }) }) .collect::>()?; @@ -374,6 +508,21 @@ pub struct IntermediateHistogramBucketEntry { pub sub_aggregation: IntermediateAggregationResults, } +impl IntermediateHistogramBucketEntry { + pub(crate) fn into_final_bucket_entry( + self, + req: &AggregationsInternal, + ) -> crate::Result { + Ok(BucketEntry { + key: Key::F64(self.key), + doc_count: self.doc_count, + sub_aggregation: self + .sub_aggregation + .into_final_bucket_result_internal(req)?, + }) + } +} + impl From for IntermediateHistogramBucketEntry { fn from(entry: SegmentHistogramBucketEntry) -> Self { IntermediateHistogramBucketEntry { @@ -402,6 +551,23 @@ pub struct IntermediateRangeBucketEntry { pub to: Option, } +impl IntermediateRangeBucketEntry { + pub(crate) fn into_final_bucket_entry( + self, + req: &AggregationsInternal, + ) -> crate::Result { + Ok(RangeBucketEntry { + key: self.key, + doc_count: self.doc_count, + sub_aggregation: self + .sub_aggregation + .into_final_bucket_result_internal(req)?, + to: self.to, + from: self.from, + }) + } +} + /// This is the term entry for a bucket, which contains a count, and optionally /// sub_aggregations. #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index 0498ffbe80..2f704b17d0 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -222,7 +222,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let reader = index.reader()?; let searcher = reader.searcher(); @@ -299,7 +299,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 37fa05c0ff..7f6f8378ce 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -28,7 +28,7 @@ //! //! ```verbatim //! let agg_req: Aggregations = serde_json::from_str(json_request_string).unwrap(); -//! let collector = AggregationCollector::from_aggs(agg_req); +//! let collector = AggregationCollector::from_aggs(agg_req, None); //! let searcher = reader.searcher(); //! let agg_res = searcher.search(&term_query, &collector).unwrap_err(); //! let json_response_string: String = &serde_json::to_string(&agg_res)?; @@ -68,7 +68,7 @@ //! .into_iter() //! .collect(); //! -//! let collector = AggregationCollector::from_aggs(agg_req); +//! let collector = AggregationCollector::from_aggs(agg_req, None); //! //! let searcher = reader.searcher(); //! let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); @@ -358,7 +358,7 @@ mod tests { index: &Index, query: Option<(&str, &str)>, ) -> crate::Result { - let collector = AggregationCollector::from_aggs(agg_req); + let collector = AggregationCollector::from_aggs(agg_req, None); let reader = index.reader()?; let searcher = reader.searcher(); @@ -417,7 +417,9 @@ mod tests { let mut schema_builder = Schema::builder(); let text_fieldtype = crate::schema::TextOptions::default() .set_indexing_options( - TextFieldIndexing::default().set_index_option(IndexRecordOption::WithFreqs), + TextFieldIndexing::default() + .set_index_option(IndexRecordOption::Basic) + .set_fieldnorms(false), ) .set_fast() .set_stored(); @@ -435,7 +437,8 @@ mod tests { ); let index = Index::create_in_ram(schema_builder.build()); { - let mut index_writer = index.writer_for_tests()?; + // let mut index_writer = index.writer_for_tests()?; + let mut index_writer = index.writer_with_num_threads(1, 30_000_000)?; for values in segment_and_values { for (i, term) in values { let i = *i; @@ -457,9 +460,11 @@ mod tests { let segment_ids = index .searchable_segment_ids() .expect("Searchable segments failed."); - let mut index_writer = index.writer_for_tests()?; - index_writer.merge(&segment_ids).wait()?; - index_writer.wait_merging_threads()?; + if segment_ids.len() > 1 { + let mut index_writer = index.writer_for_tests()?; + index_writer.merge(&segment_ids).wait()?; + index_writer.wait_merging_threads()?; + } } Ok(index) @@ -542,16 +547,15 @@ mod tests { .unwrap(); let agg_res: AggregationResults = if use_distributed_collector { - let collector = DistributedAggregationCollector::from_aggs(agg_req.clone()); + let collector = DistributedAggregationCollector::from_aggs(agg_req.clone(), None); let searcher = reader.searcher(); - AggregationResults::from_intermediate_and_req( - searcher.search(&AllQuery, &collector).unwrap(), - agg_req, - ) - .unwrap() + let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap(); + intermediate_agg_result + .into_final_bucket_result(agg_req) + .unwrap() } else { - let collector = AggregationCollector::from_aggs(agg_req); + let collector = AggregationCollector::from_aggs(agg_req, None); let searcher = reader.searcher(); searcher.search(&AllQuery, &collector).unwrap() @@ -788,7 +792,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = searcher.search(&term_query, &collector).unwrap(); @@ -978,16 +982,16 @@ mod tests { assert_eq!(field_names, vec!["text".to_string()].into_iter().collect()); let agg_res: AggregationResults = if use_distributed_collector { - let collector = DistributedAggregationCollector::from_aggs(agg_req.clone()); + let collector = DistributedAggregationCollector::from_aggs(agg_req.clone(), None); let searcher = reader.searcher(); let res = searcher.search(&term_query, &collector).unwrap(); // Test de/serialization roundtrip on intermediate_agg_result let res: IntermediateAggregationResults = serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); - AggregationResults::from_intermediate_and_req(res, agg_req.clone()).unwrap() + res.into_final_bucket_result(agg_req.clone()).unwrap() } else { - let collector = AggregationCollector::from_aggs(agg_req.clone()); + let collector = AggregationCollector::from_aggs(agg_req.clone(), None); let searcher = reader.searcher(); searcher.search(&term_query, &collector).unwrap() @@ -1045,7 +1049,7 @@ mod tests { ); // Test empty result set - let collector = AggregationCollector::from_aggs(agg_req); + let collector = AggregationCollector::from_aggs(agg_req, None); let searcher = reader.searcher(); searcher.search(&query_with_no_hits, &collector).unwrap(); @@ -1110,7 +1114,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); @@ -1223,7 +1227,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1254,7 +1258,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1285,7 +1289,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1324,7 +1328,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1353,7 +1357,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req); + let collector = AggregationCollector::from_aggs(agg_req, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1382,7 +1386,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req); + let collector = AggregationCollector::from_aggs(agg_req, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1418,7 +1422,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1453,7 +1457,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1492,7 +1496,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1522,7 +1526,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = @@ -1578,7 +1582,7 @@ mod tests { .into_iter() .collect(); - let collector = AggregationCollector::from_aggs(agg_req_1); + let collector = AggregationCollector::from_aggs(agg_req_1, None); let searcher = reader.searcher(); let agg_res: AggregationResults = diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 81f2b85de9..fe07400897 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -4,19 +4,22 @@ //! merging. use std::fmt::Debug; +use std::rc::Rc; +use std::sync::atomic::AtomicU32; use super::agg_req::MetricAggregation; use super::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor, }; use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector, SegmentTermCollector}; +use super::collector::MAX_BUCKET_COUNT; use super::intermediate_agg_result::{IntermediateAggregationResults, IntermediateBucketResult}; use super::metric::{ AverageAggregation, SegmentAverageCollector, SegmentStatsCollector, StatsAggregation, }; use super::VecWithNames; use crate::aggregation::agg_req::BucketAggregationType; -use crate::DocId; +use crate::{DocId, TantivyError}; pub(crate) const DOC_BLOCK_SIZE: usize = 64; pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE]; @@ -115,21 +118,22 @@ impl SegmentAggregationResultsCollector { &mut self, doc: crate::DocId, agg_with_accessor: &AggregationsWithAccessor, - ) { + ) -> crate::Result<()> { self.staged_docs[self.num_staged_docs] = doc; self.num_staged_docs += 1; if self.num_staged_docs == self.staged_docs.len() { - self.flush_staged_docs(agg_with_accessor, false); + self.flush_staged_docs(agg_with_accessor, false)?; } + Ok(()) } pub(crate) fn flush_staged_docs( &mut self, agg_with_accessor: &AggregationsWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { if self.num_staged_docs == 0 { - return; + return Ok(()); } if let Some(metrics) = &mut self.metrics { for (collector, agg_with_accessor) in @@ -148,11 +152,12 @@ impl SegmentAggregationResultsCollector { &self.staged_docs[..self.num_staged_docs], agg_with_accessor, force_flush, - ); + )?; } } self.num_staged_docs = 0; + Ok(()) } } @@ -234,6 +239,7 @@ impl SegmentBucketResultCollector { Ok(Self::Range(SegmentRangeCollector::from_req_and_validate( range_req, &req.sub_aggregation, + &req.bucket_count, req.field_type, )?)) } @@ -256,17 +262,52 @@ impl SegmentBucketResultCollector { doc: &[DocId], bucket_with_accessor: &BucketAggregationWithAccessor, force_flush: bool, - ) { + ) -> crate::Result<()> { match self { SegmentBucketResultCollector::Range(range) => { - range.collect_block(doc, bucket_with_accessor, force_flush); + range.collect_block(doc, bucket_with_accessor, force_flush)?; } SegmentBucketResultCollector::Histogram(histogram) => { - histogram.collect_block(doc, bucket_with_accessor, force_flush) + histogram.collect_block(doc, bucket_with_accessor, force_flush)?; } SegmentBucketResultCollector::Terms(terms) => { - terms.collect_block(doc, bucket_with_accessor, force_flush) + terms.collect_block(doc, bucket_with_accessor, force_flush)?; } } + Ok(()) + } +} + +#[derive(Clone)] +pub(crate) struct BucketCount { + /// The counter which is shared between the aggregations for one request. + pub(crate) bucket_count: Rc, + pub(crate) max_bucket_count: u32, +} + +impl Default for BucketCount { + fn default() -> Self { + Self { + bucket_count: Default::default(), + max_bucket_count: MAX_BUCKET_COUNT, + } + } +} + +impl BucketCount { + pub(crate) fn validate_bucket_count(&self) -> crate::Result<()> { + if self.get_count() > self.max_bucket_count { + return Err(TantivyError::InvalidArgument( + "Aborting aggregation because too many buckets were created".to_string(), + )); + } + Ok(()) + } + pub(crate) fn add_count(&self, count: u32) { + self.bucket_count + .fetch_add(count as u32, std::sync::atomic::Ordering::Relaxed); + } + pub(crate) fn get_count(&self) -> u32 { + self.bucket_count.load(std::sync::atomic::Ordering::Relaxed) } } diff --git a/src/collector/histogram_collector.rs b/src/collector/histogram_collector.rs index 22956a86a2..c4dfba59a0 100644 --- a/src/collector/histogram_collector.rs +++ b/src/collector/histogram_collector.rs @@ -72,8 +72,7 @@ impl HistogramComputer { return; } let delta = value - self.min_value; - let delta_u64 = delta.to_u64(); - let bucket_id: usize = self.divider.divide(delta_u64) as usize; + let bucket_id: usize = self.divider.divide(delta) as usize; if bucket_id < self.counts.len() { self.counts[bucket_id] += 1; } @@ -287,7 +286,7 @@ mod tests { DateTime::from_primitive( Date::from_calendar_date(1980, Month::January, 1)?.with_hms(0, 0, 0)?, ), - 3600 * 24 * 365, // it is just for a unit test... sorry leap years. + 3_600_000_000 * 24 * 365, // it is just for a unit test... sorry leap years. 10, ); let week_histogram = searcher.search(&all_query, &week_histogram_collector)?; diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index f8d3dd34ab..b24faf73e4 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -469,7 +469,7 @@ mod tests { fn test_serialize_metas_invalid_comp() { let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zsstd","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#; - let err = serde_json::from_str::(&json).unwrap_err(); + let err = serde_json::from_str::(json).unwrap_err(); assert_eq!( err.to_string(), "unknown variant `zsstd`, expected one of `none`, `lz4`, `brotli`, `snappy`, `zstd`, \ @@ -479,7 +479,7 @@ mod tests { let json = r#"{"index_settings":{"sort_by_field":{"field":"text","order":"Asc"},"docstore_compression":"zstd(bla=10)","docstore_blocksize":1000000},"segments":[],"schema":[{"name":"text","type":"text","options":{"indexing":{"record":"position","fieldnorms":true,"tokenizer":"default"},"stored":false,"fast":false}}],"opstamp":0}"#; - let err = serde_json::from_str::(&json).unwrap_err(); + let err = serde_json::from_str::(json).unwrap_err(); assert_eq!( err.to_string(), "unknown zstd option \"bla\" at line 1 column 103".to_string() diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index ce16d756ad..004a5328e6 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -52,11 +52,13 @@ pub trait MultiValueLength { fn get_total_len(&self) -> u64; } -/// Trait for types that are allowed for fast fields: (u64, i64 and f64). +/// Trait for types that are allowed for fast fields: +/// (u64, i64 and f64, bool, DateTime). pub trait FastValue: Clone + Copy + Send + Sync + PartialOrd + 'static { /// Converts a value from u64 /// /// Internally all fast field values are encoded as u64. + /// **Note: To be used for converting encoded Term, Posting values.** fn from_u64(val: u64) -> Self; /// Converts a value to u64. @@ -162,10 +164,7 @@ impl FastValue for f64 { impl FastValue for bool { fn from_u64(val: u64) -> Self { - match val { - 0 => false, - _ => true, - } + val != 0u64 } fn to_u64(&self) -> u64 { @@ -192,24 +191,27 @@ impl FastValue for bool { } impl FastValue for DateTime { - fn from_u64(timestamp_u64: u64) -> Self { - let unix_timestamp = i64::from_u64(timestamp_u64); - Self::from_unix_timestamp(unix_timestamp) + /// Converts a timestamp microseconds into DateTime. + /// + /// **Note the timestamps is expected to be in microseconds.** + fn from_u64(timestamp_micros_u64: u64) -> Self { + let timestamp_micros = i64::from_u64(timestamp_micros_u64); + Self::from_timestamp_micros(timestamp_micros) } fn to_u64(&self) -> u64 { - self.into_unix_timestamp().to_u64() + common::i64_to_u64(self.into_timestamp_micros()) } fn fast_field_cardinality(field_type: &FieldType) -> Option { match *field_type { - FieldType::Date(ref integer_options) => integer_options.get_fastfield_cardinality(), + FieldType::Date(ref options) => options.get_fastfield_cardinality(), _ => None, } } fn as_u64(&self) -> u64 { - self.into_unix_timestamp().as_u64() + self.into_timestamp_micros().as_u64() } fn to_type() -> Type { @@ -264,9 +266,9 @@ mod tests { use super::*; use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr}; use crate::merge_policy::NoMergePolicy; - use crate::schema::{Document, Field, NumericOptions, Schema, FAST, STRING, TEXT}; + use crate::schema::{Document, Field, Schema, FAST, STRING, TEXT}; use crate::time::OffsetDateTime; - use crate::{Index, SegmentId, SegmentReader}; + use crate::{DateOptions, DatePrecision, Index, SegmentId, SegmentReader}; pub static SCHEMA: Lazy = Lazy::new(|| { let mut schema_builder = Schema::builder(); @@ -562,8 +564,8 @@ mod tests { } #[test] - fn test_default_datetime() { - assert_eq!(0, DateTime::make_zero().into_unix_timestamp()); + fn test_default_date() { + assert_eq!(0, DateTime::make_zero().into_timestamp_secs()); } fn get_vals_for_docs(ff: &MultiValuedFastFieldReader, docs: Range) -> Vec { @@ -769,10 +771,15 @@ mod tests { fn test_datefastfield() -> crate::Result<()> { use crate::fastfield::FastValue; let mut schema_builder = Schema::builder(); - let date_field = schema_builder.add_date_field("date", FAST); + let date_field = schema_builder.add_date_field( + "date", + DateOptions::from(FAST).set_precision(DatePrecision::Microseconds), + ); let multi_date_field = schema_builder.add_date_field( "multi_date", - NumericOptions::default().set_fast(Cardinality::MultiValues), + DateOptions::default() + .set_precision(DatePrecision::Microseconds) + .set_fast(Cardinality::MultiValues), ); let schema = schema_builder.build(); let index = Index::create_in_ram(schema); @@ -800,23 +807,23 @@ mod tests { let dates_fast_field = fast_fields.dates(multi_date_field).unwrap(); let mut dates = vec![]; { - assert_eq!(date_fast_field.get(0u32).into_unix_timestamp(), 1i64); + assert_eq!(date_fast_field.get(0u32).into_timestamp_micros(), 1i64); dates_fast_field.get_vals(0u32, &mut dates); assert_eq!(dates.len(), 2); - assert_eq!(dates[0].into_unix_timestamp(), 2i64); - assert_eq!(dates[1].into_unix_timestamp(), 3i64); + assert_eq!(dates[0].into_timestamp_micros(), 2i64); + assert_eq!(dates[1].into_timestamp_micros(), 3i64); } { - assert_eq!(date_fast_field.get(1u32).into_unix_timestamp(), 4i64); + assert_eq!(date_fast_field.get(1u32).into_timestamp_micros(), 4i64); dates_fast_field.get_vals(1u32, &mut dates); assert!(dates.is_empty()); } { - assert_eq!(date_fast_field.get(2u32).into_unix_timestamp(), 0i64); + assert_eq!(date_fast_field.get(2u32).into_timestamp_micros(), 0i64); dates_fast_field.get_vals(2u32, &mut dates); assert_eq!(dates.len(), 2); - assert_eq!(dates[0].into_unix_timestamp(), 5i64); - assert_eq!(dates[1].into_unix_timestamp(), 6i64); + assert_eq!(dates[0].into_timestamp_micros(), 5i64); + assert_eq!(dates[1].into_timestamp_micros(), 6i64); } Ok(()) } @@ -853,7 +860,7 @@ mod tests { .unwrap(); serializer.close().unwrap(); } - let file = directory.open_read(&path).unwrap(); + let file = directory.open_read(path).unwrap(); assert_eq!(file.len(), 36); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(field).unwrap(); @@ -889,7 +896,7 @@ mod tests { .unwrap(); serializer.close().unwrap(); } - let file = directory.open_read(&path).unwrap(); + let file = directory.open_read(path).unwrap(); assert_eq!(file.len(), 48); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(field).unwrap(); @@ -923,7 +930,7 @@ mod tests { .unwrap(); serializer.close().unwrap(); } - let file = directory.open_read(&path).unwrap(); + let file = directory.open_read(path).unwrap(); assert_eq!(file.len(), 35); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(field).unwrap(); diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 172f758e0e..69870d0324 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -13,7 +13,7 @@ mod tests { use crate::collector::TopDocs; use crate::indexer::NoMergePolicy; use crate::query::QueryParser; - use crate::schema::{Cardinality, Facet, FacetOptions, NumericOptions, Schema}; + use crate::schema::{Cardinality, DateOptions, Facet, FacetOptions, NumericOptions, Schema}; use crate::time::format_description::well_known::Rfc3339; use crate::time::{Duration, OffsetDateTime}; use crate::{DateTime, Document, Index, Term}; @@ -58,7 +58,7 @@ mod tests { let mut schema_builder = Schema::builder(); let date_field = schema_builder.add_date_field( "multi_date_field", - NumericOptions::default() + DateOptions::default() .set_fast(Cardinality::MultiValues) .set_indexed() .set_fieldnorm() diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 7adbfb0bbe..7b976e3273 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -4,12 +4,12 @@ use fnv::FnvHashMap; use tantivy_bitpacker::minmax; use crate::fastfield::serializer::BitpackedFastFieldSerializerLegacy; -use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType}; +use crate::fastfield::{value_to_u64, CompositeFastFieldSerializer, FastFieldType, FastValue}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; -use crate::schema::{Document, Field}; +use crate::schema::{Document, Field, Value}; use crate::termdict::TermOrdinal; -use crate::DocId; +use crate::{DatePrecision, DocId}; /// Writer for multi-valued (as in, more than one value per document) /// int fast field. @@ -36,6 +36,7 @@ use crate::DocId; /// term ids when the segment is getting serialized. pub struct MultiValuedFastFieldWriter { field: Field, + precision_opt: Option, vals: Vec, doc_index: Vec, fast_field_type: FastFieldType, @@ -43,9 +44,14 @@ pub struct MultiValuedFastFieldWriter { impl MultiValuedFastFieldWriter { /// Creates a new `MultiValuedFastFieldWriter` - pub(crate) fn new(field: Field, fast_field_type: FastFieldType) -> Self { + pub(crate) fn new( + field: Field, + fast_field_type: FastFieldType, + precision_opt: Option, + ) -> Self { MultiValuedFastFieldWriter { field, + precision_opt, vals: Vec::new(), doc_index: Vec::new(), fast_field_type, @@ -83,7 +89,14 @@ impl MultiValuedFastFieldWriter { } for field_value in doc.field_values() { if field_value.field == self.field { - self.add_val(value_to_u64(field_value.value())); + let value = field_value.value(); + let value_u64 = match (self.precision_opt, value) { + (Some(precision), Value::Date(date_val)) => { + date_val.truncate(precision).to_u64() + } + _ => value_to_u64(value), + }; + self.add_val(value_u64); } } } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index c0e9b6982b..4d1b5d3467 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -7,12 +7,13 @@ use tantivy_bitpacker::BlockedBitpacker; use super::multivalued::MultiValuedFastFieldWriter; use super::serializer::FastFieldStats; -use super::{FastFieldDataAccess, FastFieldType}; +use super::{FastFieldDataAccess, FastFieldType, FastValue}; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; use crate::indexer::doc_id_mapping::DocIdMapping; use crate::postings::UnorderedTermId; -use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema}; +use crate::schema::{Cardinality, Document, Field, FieldEntry, FieldType, Schema, Value}; use crate::termdict::TermOrdinal; +use crate::DatePrecision; /// The `FastFieldsWriter` groups all of the fast field writers. pub struct FastFieldsWriter { @@ -43,31 +44,51 @@ impl FastFieldsWriter { FieldType::I64(ref int_options) | FieldType::U64(ref int_options) | FieldType::F64(ref int_options) - | FieldType::Bool(ref int_options) - | FieldType::Date(ref int_options) => { + | FieldType::Bool(ref int_options) => { match int_options.get_fastfield_cardinality() { Some(Cardinality::SingleValue) => { - let mut fast_field_writer = IntFastFieldWriter::new(field); + let mut fast_field_writer = IntFastFieldWriter::new(field, None); let default_value = fast_field_default_value(field_entry); fast_field_writer.set_val_if_missing(default_value); single_value_writers.push(fast_field_writer); } Some(Cardinality::MultiValues) => { - let fast_field_writer = - MultiValuedFastFieldWriter::new(field, FastFieldType::Numeric); + let fast_field_writer = MultiValuedFastFieldWriter::new( + field, + FastFieldType::Numeric, + None, + ); multi_values_writers.push(fast_field_writer); } None => {} } } + FieldType::Date(ref options) => match options.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + let mut fast_field_writer = + IntFastFieldWriter::new(field, Some(options.get_precision())); + let default_value = fast_field_default_value(field_entry); + fast_field_writer.set_val_if_missing(default_value); + single_value_writers.push(fast_field_writer); + } + Some(Cardinality::MultiValues) => { + let fast_field_writer = MultiValuedFastFieldWriter::new( + field, + FastFieldType::Numeric, + Some(options.get_precision()), + ); + multi_values_writers.push(fast_field_writer); + } + None => {} + }, FieldType::Facet(_) => { let fast_field_writer = - MultiValuedFastFieldWriter::new(field, FastFieldType::Facet); + MultiValuedFastFieldWriter::new(field, FastFieldType::Facet, None); term_id_writers.push(fast_field_writer); } FieldType::Str(_) if field_entry.is_fast() => { let fast_field_writer = - MultiValuedFastFieldWriter::new(field, FastFieldType::String); + MultiValuedFastFieldWriter::new(field, FastFieldType::String, None); term_id_writers.push(fast_field_writer); } FieldType::Bytes(bytes_option) => { @@ -230,6 +251,7 @@ impl FastFieldsWriter { /// using `common::i64_to_u64` and `common::f64_to_u64`. pub struct IntFastFieldWriter { field: Field, + precision_opt: Option, vals: BlockedBitpacker, val_count: usize, val_if_missing: u64, @@ -239,9 +261,10 @@ pub struct IntFastFieldWriter { impl IntFastFieldWriter { /// Creates a new `IntFastFieldWriter` - pub fn new(field: Field) -> IntFastFieldWriter { + pub fn new(field: Field, precision_opt: Option) -> IntFastFieldWriter { IntFastFieldWriter { field, + precision_opt, vals: BlockedBitpacker::new(), val_count: 0, val_if_missing: 0u64, @@ -305,7 +328,13 @@ impl IntFastFieldWriter { pub fn add_document(&mut self, doc: &Document) { match doc.get_first(self.field) { Some(v) => { - self.add_val(super::value_to_u64(v)); + let value = match (self.precision_opt, v) { + (Some(precision), Value::Date(date_val)) => { + date_val.truncate(precision).to_u64() + } + _ => super::value_to_u64(v), + }; + self.add_val(value); } None => { self.add_val(self.val_if_missing); diff --git a/src/indexer/json_term_writer.rs b/src/indexer/json_term_writer.rs index 501be0cfc1..d3d6478c5a 100644 --- a/src/indexer/json_term_writer.rs +++ b/src/indexer/json_term_writer.rs @@ -8,7 +8,7 @@ use crate::schema::{Field, Type}; use crate::time::format_description::well_known::Rfc3339; use crate::time::{OffsetDateTime, UtcOffset}; use crate::tokenizer::TextAnalyzer; -use crate::{DateTime, DocId, Term}; +use crate::{DatePrecision, DateTime, DocId, Term}; /// This object is a map storing the last position for a given path for the current document /// being indexed. @@ -323,9 +323,16 @@ impl<'a> JsonTermWriter<'a> { pub fn set_fast_value(&mut self, val: T) { self.close_path_and_set_type(T::to_type()); + let value = if T::to_type() == Type::Date { + DateTime::from_u64(val.to_u64()) + .truncate(DatePrecision::Seconds) + .to_u64() + } else { + val.to_u64() + }; self.term_buffer .as_mut() - .extend_from_slice(val.to_u64().to_be_bytes().as_slice()); + .extend_from_slice(value.to_be_bytes().as_slice()); } #[cfg(test)] diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 007934ba53..1e95849bd1 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -298,8 +298,16 @@ impl IndexMerger { FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::F64(ref options) - | FieldType::Bool(ref options) - | FieldType::Date(ref options) => match options.get_fastfield_cardinality() { + | FieldType::Bool(ref options) => match options.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + self.write_single_fast_field(field, fast_field_serializer, doc_id_mapping)?; + } + Some(Cardinality::MultiValues) => { + self.write_multi_fast_field(field, fast_field_serializer, doc_id_mapping)?; + } + None => {} + }, + FieldType::Date(ref options) => match options.get_fastfield_cardinality() { Some(Cardinality::SingleValue) => { self.write_single_fast_field(field, fast_field_serializer, doc_id_mapping)?; } @@ -1081,7 +1089,7 @@ impl IndexMerger { store_writer.store_bytes(&doc_bytes)?; } } else { - store_writer.stack(&store_reader)?; + store_writer.stack(store_reader)?; } } } diff --git a/src/indexer/segment_serializer.rs b/src/indexer/segment_serializer.rs index 554503e668..ffb6a3dc33 100644 --- a/src/indexer/segment_serializer.rs +++ b/src/indexer/segment_serializer.rs @@ -42,7 +42,7 @@ impl SegmentSerializer { let blocksize = segment.index().settings().docstore_blocksize; Ok(SegmentSerializer { segment, - store_writer: StoreWriter::new(store_write, compressor, blocksize), + store_writer: StoreWriter::new(store_write, compressor, blocksize)?, fast_field_serializer, fieldnorms_serializer: Some(fieldnorms_serializer), postings_serializer, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 94469c5bca..02fbd56abf 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -14,7 +14,7 @@ use crate::store::{StoreReader, StoreWriter}; use crate::tokenizer::{ BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer, }; -use crate::{DocId, Document, Opstamp, SegmentComponent}; +use crate::{DatePrecision, DocId, Document, Opstamp, SegmentComponent}; /// Computes the initial size of the hash table. /// @@ -248,7 +248,7 @@ impl SegmentWriter { FieldType::Date(_) => { for value in values { let date_val = value.as_date().ok_or_else(make_schema_error)?; - term_buffer.set_u64(date_val.to_u64()); + term_buffer.set_u64(date_val.truncate(DatePrecision::Seconds).to_u64()); postings_writer.subscribe(doc_id, 0u32, term_buffer, ctx); } } @@ -382,7 +382,7 @@ fn remap_and_write( let block_size = serializer.segment().index().settings().docstore_blocksize; let old_store_writer = std::mem::replace( &mut serializer.store_writer, - StoreWriter::new(store_write, compressor, block_size), + StoreWriter::new(store_write, compressor, block_size)?, ); old_store_writer.close()?; let store_read = StoreReader::open( diff --git a/src/lib.rs b/src/lib.rs index 1490709de6..355cea54ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,7 +133,7 @@ pub use time; use crate::time::format_description::well_known::Rfc3339; use crate::time::{OffsetDateTime, PrimitiveDateTime, UtcOffset}; -/// A date/time value with second precision. +/// A date/time value with microsecond precision. /// /// This timestamp does not carry any explicit time zone information. /// Users are responsible for applying the provided conversion @@ -145,13 +145,30 @@ use crate::time::{OffsetDateTime, PrimitiveDateTime, UtcOffset}; /// to prevent unintended usage. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct DateTime { - unix_timestamp: i64, + // Timestamp in microseconds. + pub(crate) timestamp_micros: i64, } impl DateTime { - /// Create new from UNIX timestamp - pub const fn from_unix_timestamp(unix_timestamp: i64) -> Self { - Self { unix_timestamp } + /// Create new from UNIX timestamp in seconds + pub const fn from_timestamp_secs(seconds: i64) -> Self { + Self { + timestamp_micros: seconds * 1_000_000, + } + } + + /// Create new from UNIX timestamp in milliseconds + pub const fn from_timestamp_millis(milliseconds: i64) -> Self { + Self { + timestamp_micros: milliseconds * 1_000, + } + } + + /// Create new from UNIX timestamp in microseconds. + pub const fn from_timestamp_micros(microseconds: i64) -> Self { + Self { + timestamp_micros: microseconds, + } } /// Create new from `OffsetDateTime` @@ -159,7 +176,8 @@ impl DateTime { /// The given date/time is converted to UTC and the actual /// time zone is discarded. pub const fn from_utc(dt: OffsetDateTime) -> Self { - Self::from_unix_timestamp(dt.unix_timestamp()) + let timestamp_micros = dt.unix_timestamp() as i64 * 1_000_000 + dt.microsecond() as i64; + Self { timestamp_micros } } /// Create new from `PrimitiveDateTime` @@ -167,21 +185,30 @@ impl DateTime { /// Implicitly assumes that the given date/time is in UTC! /// Otherwise the original value must only be reobtained with /// [`Self::into_primitive()`]. - pub const fn from_primitive(dt: PrimitiveDateTime) -> Self { + pub fn from_primitive(dt: PrimitiveDateTime) -> Self { Self::from_utc(dt.assume_utc()) } - /// Convert to UNIX timestamp - pub const fn into_unix_timestamp(self) -> i64 { - let Self { unix_timestamp } = self; - unix_timestamp + /// Convert to UNIX timestamp in seconds. + pub const fn into_timestamp_secs(self) -> i64 { + self.timestamp_micros / 1_000_000 + } + + /// Convert to UNIX timestamp in milliseconds. + pub const fn into_timestamp_millis(self) -> i64 { + self.timestamp_micros / 1_000 + } + + /// Convert to UNIX timestamp in microseconds. + pub const fn into_timestamp_micros(self) -> i64 { + self.timestamp_micros } /// Convert to UTC `OffsetDateTime` pub fn into_utc(self) -> OffsetDateTime { - let Self { unix_timestamp } = self; - let utc_datetime = - OffsetDateTime::from_unix_timestamp(unix_timestamp).expect("valid UNIX timestamp"); + let timestamp_nanos = self.timestamp_micros as i128 * 1000; + let utc_datetime = OffsetDateTime::from_unix_timestamp_nanos(timestamp_nanos) + .expect("valid UNIX timestamp"); debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset()); utc_datetime } @@ -201,6 +228,18 @@ impl DateTime { debug_assert_eq!(UtcOffset::UTC, utc_datetime.offset()); PrimitiveDateTime::new(utc_datetime.date(), utc_datetime.time()) } + + /// Truncates the microseconds value to the corresponding precision. + pub(crate) fn truncate(self, precision: DatePrecision) -> Self { + let truncated_timestamp_micros = match precision { + DatePrecision::Seconds => (self.timestamp_micros / 1_000_000) * 1_000_000, + DatePrecision::Milliseconds => (self.timestamp_micros / 1_000) * 1_000, + DatePrecision::Microseconds => self.timestamp_micros, + }; + Self { + timestamp_micros: truncated_timestamp_micros, + } + } } impl fmt::Debug for DateTime { @@ -269,7 +308,7 @@ pub use crate::indexer::operation::UserOperation; pub use crate::indexer::{merge_filtered_segments, merge_indices, IndexWriter, PreparedCommit}; pub use crate::postings::Postings; pub use crate::reader::LeasedItem; -pub use crate::schema::{Document, Term}; +pub use crate::schema::{DateOptions, DatePrecision, Document, Term}; /// Index format version. const INDEX_FORMAT_VERSION: u32 = 4; @@ -385,6 +424,7 @@ pub mod tests { use rand::distributions::{Bernoulli, Uniform}; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; + use time::OffsetDateTime; use crate::collector::tests::TEST_COLLECTOR_WITH_SCORE; use crate::core::SegmentReader; @@ -393,7 +433,7 @@ pub mod tests { use crate::merge_policy::NoMergePolicy; use crate::query::BooleanQuery; use crate::schema::*; - use crate::{DocAddress, Index, Postings, ReloadPolicy}; + use crate::{DateTime, DocAddress, Index, Postings, ReloadPolicy}; pub fn fixed_size_test() { let mut buffer = Vec::new(); @@ -1102,4 +1142,35 @@ pub mod tests { assert!(index.validate_checksum()?.is_empty()); Ok(()) } + + #[test] + fn test_datetime() { + let now = OffsetDateTime::now_utc(); + + let dt = DateTime::from_utc(now).into_utc(); + assert_eq!(dt.to_ordinal_date(), now.to_ordinal_date()); + assert_eq!(dt.to_hms_micro(), now.to_hms_micro()); + // We don't store nanosecond level precision. + assert_ne!(dt.to_hms_nano(), now.to_hms_nano()); + + let dt = DateTime::from_timestamp_secs(now.unix_timestamp()).into_utc(); + assert_eq!(dt.to_ordinal_date(), now.to_ordinal_date()); + assert_eq!(dt.to_hms(), now.to_hms()); + // Constructed from a second precision. + assert_ne!(dt.to_hms_micro(), now.to_hms_micro()); + + let dt = + DateTime::from_timestamp_micros((now.unix_timestamp_nanos() / 1_000) as i64).into_utc(); + assert_eq!(dt.to_ordinal_date(), now.to_ordinal_date()); + assert_eq!(dt.to_hms_micro(), now.to_hms_micro()); + + let dt_from_ts_nanos = + OffsetDateTime::from_unix_timestamp_nanos(18446744073709551615i128).unwrap(); + let offset_dt = DateTime::from_utc(dt_from_ts_nanos).into_utc(); + assert_eq!( + dt_from_ts_nanos.to_ordinal_date(), + offset_dt.to_ordinal_date() + ); + assert_eq!(dt_from_ts_nanos.to_hms_micro(), offset_dt.to_hms_micro()); + } } diff --git a/src/query/more_like_this/more_like_this.rs b/src/query/more_like_this/more_like_this.rs index 737e9ea435..299d38ae1a 100644 --- a/src/query/more_like_this/more_like_this.rs +++ b/src/query/more_like_this/more_like_this.rs @@ -243,13 +243,12 @@ impl MoreLikeThis { } FieldType::Date(_) => { for value in values { - // TODO: Ask if this is the semantic (timestamp) we want - let unix_timestamp = value + let timestamp_micros = value .as_date() .ok_or_else(|| TantivyError::InvalidArgument("invalid value".to_string()))? - .into_unix_timestamp(); - if !self.is_noise_word(unix_timestamp.to_string()) { - let term = Term::from_field_i64(field, unix_timestamp); + .into_timestamp_micros(); + if !self.is_noise_word(timestamp_micros.to_string()) { + let term = Term::from_field_i64(field, timestamp_micros); *term_frequencies.entry(term).or_insert(0) += 1; } } diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index dbb801e5b3..ecd2b11423 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -1060,7 +1060,6 @@ mod test { #[test] fn test_json_field_possibly_a_date() { - // Subseconds are discarded test_parse_query_to_logical_ast_helper( r#"json.date:"2019-10-12T07:20:50.52Z""#, r#"(Term(type=Json, field=14, path=date, vtype=Date, 2019-10-12T07:20:50Z) "[(0, Term(type=Json, field=14, path=date, vtype=Str, "2019")), (1, Term(type=Json, field=14, path=date, vtype=Str, "10")), (2, Term(type=Json, field=14, path=date, vtype=Str, "12t07")), (3, Term(type=Json, field=14, path=date, vtype=Str, "20")), (4, Term(type=Json, field=14, path=date, vtype=Str, "50")), (5, Term(type=Json, field=14, path=date, vtype=Str, "52z"))]")"#, @@ -1344,9 +1343,16 @@ mod test { query_parser.parse_query("date:18a"), Err(QueryParserError::DateFormatError(_)) ); - assert!(query_parser - .parse_query("date:\"1985-04-12T23:20:50.52Z\"") - .is_ok()); + test_parse_query_to_logical_ast_helper( + r#"date:"2010-11-21T09:55:06.000000000+02:00""#, + r#"Term(type=Date, field=9, 2010-11-21T07:55:06Z)"#, + true, + ); + test_parse_query_to_logical_ast_helper( + r#"date:"1985-04-12T23:20:50.52Z""#, + r#"Term(type=Date, field=9, 1985-04-12T23:20:50Z)"#, + true, + ); } #[test] diff --git a/src/schema/date_time_options.rs b/src/schema/date_time_options.rs new file mode 100644 index 0000000000..0f6eea978e --- /dev/null +++ b/src/schema/date_time_options.rs @@ -0,0 +1,276 @@ +use std::ops::BitOr; + +use serde::{Deserialize, Serialize}; + +use super::Cardinality; +use crate::schema::flags::{FastFlag, IndexedFlag, SchemaFlagList, StoredFlag}; + +/// DateTime Precision +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum DatePrecision { + /// Seconds precision + Seconds, + /// Milli-seconds precision. + Milliseconds, + /// Micro-seconds precision. + Microseconds, +} + +impl Default for DatePrecision { + fn default() -> Self { + DatePrecision::Seconds + } +} + +/// Defines how DateTime field should be handled by tantivy. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct DateOptions { + indexed: bool, + // This boolean has no effect if the field is not marked as indexed true. + fieldnorms: bool, + #[serde(skip_serializing_if = "Option::is_none")] + fast: Option, + stored: bool, + // Internal storage precision, used to optimize storage + // compression on fast fields. + #[serde(default)] + precision: DatePrecision, +} + +impl DateOptions { + /// Returns true iff the value is stored. + pub fn is_stored(&self) -> bool { + self.stored + } + + /// Returns true iff the value is indexed and therefore searchable. + pub fn is_indexed(&self) -> bool { + self.indexed + } + + /// Returns true iff the field has fieldnorm. + pub fn fieldnorms(&self) -> bool { + self.fieldnorms && self.indexed + } + + /// Returns true iff the value is a fast field and multivalue. + pub fn is_multivalue_fast(&self) -> bool { + if let Some(cardinality) = self.fast { + cardinality == Cardinality::MultiValues + } else { + false + } + } + + /// Returns true iff the value is a fast field. + pub fn is_fast(&self) -> bool { + self.fast.is_some() + } + + /// Set the field as stored. + /// + /// Only the fields that are set as *stored* are + /// persisted into the Tantivy's store. + #[must_use] + pub fn set_stored(mut self) -> DateOptions { + self.stored = true; + self + } + + /// Set the field as indexed. + /// + /// Setting an integer as indexed will generate + /// a posting list for each value taken by the integer. + /// + /// This is required for the field to be searchable. + #[must_use] + pub fn set_indexed(mut self) -> DateOptions { + self.indexed = true; + self + } + + /// Set the field with fieldnorm. + /// + /// Setting an integer as fieldnorm will generate + /// the fieldnorm data for it. + #[must_use] + pub fn set_fieldnorm(mut self) -> DateOptions { + self.fieldnorms = true; + self + } + + /// Set the field as a single-valued fast field. + /// + /// Fast fields are designed for random access. + /// Access time are similar to a random lookup in an array. + /// If more than one value is associated to a fast field, only the last one is + /// kept. + #[must_use] + pub fn set_fast(mut self, cardinality: Cardinality) -> DateOptions { + self.fast = Some(cardinality); + self + } + + /// Returns the cardinality of the fastfield. + /// + /// If the field has not been declared as a fastfield, then + /// the method returns None. + pub fn get_fastfield_cardinality(&self) -> Option { + self.fast + } + + /// Sets the precision for this DateTime field. + /// + /// Internal storage precision, used to optimize storage + /// compression on fast fields. + pub fn set_precision(mut self, precision: DatePrecision) -> DateOptions { + self.precision = precision; + self + } + + /// Returns the storage precision for this DateTime field. + /// + /// Internal storage precision, used to optimize storage + /// compression on fast fields. + pub fn get_precision(&self) -> DatePrecision { + self.precision + } +} + +impl From<()> for DateOptions { + fn from(_: ()) -> DateOptions { + DateOptions::default() + } +} + +impl From for DateOptions { + fn from(_: FastFlag) -> Self { + DateOptions { + indexed: false, + fieldnorms: false, + stored: false, + fast: Some(Cardinality::SingleValue), + ..Default::default() + } + } +} + +impl From for DateOptions { + fn from(_: StoredFlag) -> Self { + DateOptions { + indexed: false, + fieldnorms: false, + stored: true, + fast: None, + ..Default::default() + } + } +} + +impl From for DateOptions { + fn from(_: IndexedFlag) -> Self { + DateOptions { + indexed: true, + fieldnorms: true, + stored: false, + fast: None, + ..Default::default() + } + } +} + +impl> BitOr for DateOptions { + type Output = DateOptions; + + fn bitor(self, other: T) -> DateOptions { + let other = other.into(); + DateOptions { + indexed: self.indexed | other.indexed, + fieldnorms: self.fieldnorms | other.fieldnorms, + stored: self.stored | other.stored, + fast: self.fast.or(other.fast), + precision: self.precision, + } + } +} + +impl From> for DateOptions +where + Head: Clone, + Tail: Clone, + Self: BitOr + From + From, +{ + fn from(head_tail: SchemaFlagList) -> Self { + Self::from(head_tail.head) | Self::from(head_tail.tail) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_date_options_consistent_with_default() { + let date_time_options: DateOptions = serde_json::from_str( + r#"{ + "indexed": false, + "fieldnorms": false, + "stored": false + }"#, + ) + .unwrap(); + assert_eq!(date_time_options, DateOptions::default()); + } + + #[test] + fn test_serialize_date_option() { + let date_options = serde_json::from_str::( + r#" + { + "indexed": true, + "fieldnorms": false, + "stored": false, + "precision": "milliseconds" + }"#, + ) + .unwrap(); + + let date_options_json = serde_json::to_value(&date_options).unwrap(); + assert_eq!( + date_options_json, + serde_json::json!({ + "precision": "milliseconds", + "indexed": true, + "fieldnorms": false, + "stored": false + }) + ); + } + + #[test] + fn test_deserialize_date_options_with_wrong_options() { + assert!(serde_json::from_str::( + r#"{ + "indexed": true, + "fieldnorms": false, + "stored": "wrong_value" + }"# + ) + .unwrap_err() + .to_string() + .contains("expected a boolean")); + + assert!(serde_json::from_str::( + r#"{ + "indexed": true, + "fieldnorms": false, + "stored": false, + "precision": "hours" + }"# + ) + .unwrap_err() + .to_string() + .contains("unknown variant `hours`")); + } +} diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs index 78b5891f37..997fbd2564 100644 --- a/src/schema/field_entry.rs +++ b/src/schema/field_entry.rs @@ -2,7 +2,8 @@ use serde::{Deserialize, Serialize}; use crate::schema::bytes_options::BytesOptions; use crate::schema::{ - is_valid_field_name, FacetOptions, FieldType, JsonObjectOptions, NumericOptions, TextOptions, + is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions, + TextOptions, }; /// A `FieldEntry` represents a field and its configuration. @@ -55,7 +56,7 @@ impl FieldEntry { } /// Creates a new date field entry. - pub fn new_date(field_name: String, date_options: NumericOptions) -> FieldEntry { + pub fn new_date(field_name: String, date_options: DateOptions) -> FieldEntry { Self::new(field_name, FieldType::Date(date_options)) } @@ -107,8 +108,8 @@ impl FieldEntry { FieldType::U64(ref options) | FieldType::I64(ref options) | FieldType::F64(ref options) - | FieldType::Date(ref options) | FieldType::Bool(ref options) => options.is_stored(), + FieldType::Date(ref options) => options.is_stored(), FieldType::Str(ref options) => options.is_stored(), FieldType::Facet(ref options) => options.is_stored(), FieldType::Bytes(ref options) => options.is_stored(), diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 1080d83fc1..c29c695543 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -5,8 +5,8 @@ use thiserror::Error; use crate::schema::bytes_options::BytesOptions; use crate::schema::facet_options::FacetOptions; use crate::schema::{ - Facet, IndexRecordOption, JsonObjectOptions, NumericOptions, TextFieldIndexing, TextOptions, - Value, + DateOptions, Facet, IndexRecordOption, JsonObjectOptions, NumericOptions, TextFieldIndexing, + TextOptions, Value, }; use crate::time::format_description::well_known::Rfc3339; use crate::time::OffsetDateTime; @@ -27,6 +27,11 @@ pub enum ValueParsingError { expected: &'static str, json: serde_json::Value, }, + #[error("Parse error on {json}: {error}")] + ParseError { + error: String, + json: serde_json::Value, + }, #[error("Invalid base64: {base64}")] InvalidBase64 { base64: String }, } @@ -133,7 +138,7 @@ pub enum FieldType { /// Bool field type configuration Bool(NumericOptions), /// Signed 64-bits Date 64 field type configuration, - Date(NumericOptions), + Date(DateOptions), /// Hierachical Facet Facet(FacetOptions), /// Bytes (one per document) @@ -202,8 +207,8 @@ impl FieldType { FieldType::U64(ref int_options) | FieldType::I64(ref int_options) | FieldType::F64(ref int_options) - | FieldType::Date(ref int_options) - | FieldType::Bool(ref int_options) => int_options.get_fastfield_cardinality().is_some(), + | FieldType::Bool(ref int_options) => int_options.is_fast(), + FieldType::Date(ref date_options) => date_options.is_fast(), FieldType::Facet(_) => true, FieldType::JsonObject(_) => false, } @@ -219,8 +224,8 @@ impl FieldType { FieldType::U64(ref int_options) | FieldType::I64(ref int_options) | FieldType::F64(ref int_options) - | FieldType::Date(ref int_options) | FieldType::Bool(ref int_options) => int_options.fieldnorms(), + FieldType::Date(ref date_options) => date_options.fieldnorms(), FieldType::Facet(_) => false, FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(), FieldType::JsonObject(ref _json_object_options) => false, @@ -243,7 +248,6 @@ impl FieldType { FieldType::U64(ref int_options) | FieldType::I64(ref int_options) | FieldType::F64(ref int_options) - | FieldType::Date(ref int_options) | FieldType::Bool(ref int_options) => { if int_options.is_indexed() { Some(IndexRecordOption::Basic) @@ -251,6 +255,13 @@ impl FieldType { None } } + FieldType::Date(ref date_options) => { + if date_options.is_indexed() { + Some(IndexRecordOption::Basic) + } else { + None + } + } FieldType::Facet(ref _facet_options) => Some(IndexRecordOption::Basic), FieldType::Bytes(ref bytes_options) => { if bytes_options.is_indexed() { @@ -273,7 +284,7 @@ impl FieldType { pub fn value_from_json(&self, json: JsonValue) -> Result { match json { JsonValue::String(field_text) => { - match *self { + match self { FieldType::Date(_) => { let dt_with_fixed_tz = OffsetDateTime::parse(&field_text, &Rfc3339) .map_err(|_err| ValueParsingError::TypeError { @@ -402,8 +413,8 @@ mod tests { let doc_json = r#"{"date": "2019-10-12T07:20:50.52+02:00"}"#; let doc = schema.parse_document(doc_json).unwrap(); let date = doc.get_first(date_field).unwrap(); - // Time zone is converted to UTC and subseconds are discarded - assert_eq!("Date(2019-10-12T05:20:50Z)", format!("{:?}", date)); + // Time zone is converted to UTC + assert_eq!("Date(2019-10-12T05:20:50.52Z)", format!("{:?}", date)); } #[test] diff --git a/src/schema/flags.rs b/src/schema/flags.rs index 06c12f8ca5..3fa73e7a6e 100644 --- a/src/schema/flags.rs +++ b/src/schema/flags.rs @@ -1,6 +1,7 @@ use std::ops::BitOr; use crate::schema::{NumericOptions, TextOptions}; +use crate::DateOptions; #[derive(Clone)] pub struct StoredFlag; @@ -65,6 +66,14 @@ impl> BitOr for SchemaFlagList> BitOr for SchemaFlagList { + type Output = DateOptions; + + fn bitor(self, rhs: DateOptions) -> Self::Output { + self.head.into() | rhs + } +} + impl> BitOr for SchemaFlagList { type Output = TextOptions; diff --git a/src/schema/mod.rs b/src/schema/mod.rs index b595e0380d..2c145f8be1 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -117,6 +117,7 @@ mod field_type; mod field_value; mod bytes_options; +mod date_time_options; mod field; mod flags; mod index_record_option; @@ -127,6 +128,7 @@ mod text_options; mod value; pub use self::bytes_options::BytesOptions; +pub use self::date_time_options::{DateOptions, DatePrecision}; pub use self::document::Document; pub(crate) use self::facet::FACET_SEP_BYTE; pub use self::facet::{Facet, FacetParseError}; diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 00aff9f38c..e91a31a625 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -134,7 +134,7 @@ impl SchemaBuilder { /// by the second one. /// The first field will get a field id /// but only the second one will be indexed - pub fn add_date_field>( + pub fn add_date_field>( &mut self, field_name_str: &str, field_options: T, @@ -813,7 +813,7 @@ mod tests { .set_tokenizer("raw") .set_index_option(IndexRecordOption::Basic), ); - let timestamp_options = NumericOptions::default() + let timestamp_options = DateOptions::default() .set_stored() .set_indexed() .set_fieldnorm() @@ -875,7 +875,8 @@ mod tests { "indexed": true, "fieldnorms": true, "fast": "single", - "stored": true + "stored": true, + "precision": "seconds" } }, { diff --git a/src/schema/term.rs b/src/schema/term.rs index a485ef8960..d9b1912a1b 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -5,7 +5,7 @@ use std::{fmt, str}; use super::Field; use crate::fastfield::FastValue; use crate::schema::{Facet, Type}; -use crate::DateTime; +use crate::{DatePrecision, DateTime}; /// Size (in bytes) of the buffer of a fast value (u64, i64, f64, or date) term. /// + + @@ -76,7 +76,7 @@ impl Term { /// Builds a term given a field, and a DateTime value pub fn from_field_date(field: Field, val: DateTime) -> Term { - Term::from_fast_value(field, &val) + Term::from_fast_value(field, &val.truncate(DatePrecision::Seconds)) } /// Creates a `Term` given a facet. diff --git a/src/schema/value.rs b/src/schema/value.rs index d5e4f72cde..5335f88452 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -24,7 +24,7 @@ pub enum Value { F64(f64), /// Bool value Bool(bool), - /// Date/time with second precision + /// Date/time with microseconds precision Date(DateTime), /// Facet Facet(Facet), @@ -251,7 +251,7 @@ impl<'a> From<&'a [u8]> for Value { } } -impl<'a> From for Value { +impl From for Value { fn from(facet: Facet) -> Value { Value::Facet(facet) } @@ -348,8 +348,10 @@ mod binary_serialize { } Value::Date(ref val) => { DATE_CODE.serialize(writer)?; - let DateTime { unix_timestamp } = val; - unix_timestamp.serialize(writer) + let DateTime { + timestamp_micros, .. + } = val; + timestamp_micros.serialize(writer) } Value::Facet(ref facet) => { HIERARCHICAL_FACET_CODE.serialize(writer)?; @@ -391,8 +393,10 @@ mod binary_serialize { Ok(Value::Bool(value)) } DATE_CODE => { - let unix_timestamp = i64::deserialize(reader)?; - Ok(Value::Date(DateTime::from_unix_timestamp(unix_timestamp))) + let timestamp_micros = i64::deserialize(reader)?; + Ok(Value::Date(DateTime::from_timestamp_micros( + timestamp_micros, + ))) } HIERARCHICAL_FACET_CODE => Ok(Value::Facet(Facet::deserialize(reader)?)), BYTES_CODE => Ok(Value::Bytes(Vec::::deserialize(reader)?)), diff --git a/src/store/index/mod.rs b/src/store/index/mod.rs index 2401e23d6d..af572e758b 100644 --- a/src/store/index/mod.rs +++ b/src/store/index/mod.rs @@ -54,7 +54,7 @@ mod tests { fn test_skip_index_empty() -> io::Result<()> { let mut output: Vec = Vec::new(); let skip_index_builder: SkipIndexBuilder = SkipIndexBuilder::new(); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert!(skip_cursor.next().is_none()); @@ -70,7 +70,7 @@ mod tests { byte_range: 0..3, }; skip_index_builder.insert(checkpoint.clone()); - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); let mut skip_cursor = skip_index.checkpoints(); assert_eq!(skip_cursor.next(), Some(checkpoint)); @@ -108,7 +108,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; let skip_index: SkipIndex = SkipIndex::open(OwnedBytes::new(output)); assert_eq!( @@ -167,7 +167,7 @@ mod tests { for checkpoint in &checkpoints { skip_index_builder.insert(checkpoint.clone()); } - skip_index_builder.write(&mut output)?; + skip_index_builder.serialize_into(&mut output)?; assert_eq!(output.len(), 4035); let resulting_checkpoints: Vec = SkipIndex::open(OwnedBytes::new(output)) .checkpoints() @@ -238,7 +238,7 @@ mod tests { skip_index_builder.insert(checkpoint); } let mut buffer = Vec::new(); - skip_index_builder.write(&mut buffer).unwrap(); + skip_index_builder.serialize_into(&mut buffer).unwrap(); let skip_index = SkipIndex::open(OwnedBytes::new(buffer)); let iter_checkpoints: Vec = skip_index.checkpoints().collect(); assert_eq!(&checkpoints[..], &iter_checkpoints[..]); diff --git a/src/store/index/skip_index_builder.rs b/src/store/index/skip_index_builder.rs index cbb899a219..2f34376cd4 100644 --- a/src/store/index/skip_index_builder.rs +++ b/src/store/index/skip_index_builder.rs @@ -87,7 +87,7 @@ impl SkipIndexBuilder { } } - pub fn write(mut self, output: &mut W) -> io::Result<()> { + pub fn serialize_into(mut self, output: &mut W) -> io::Result<()> { let mut last_pointer = None; for skip_layer in self.layers.iter_mut() { if let Some(checkpoint) = last_pointer { diff --git a/src/store/mod.rs b/src/store/mod.rs index 88ef9b579e..8dd035fe7f 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -88,7 +88,7 @@ pub mod tests { schema_builder.add_text_field("title", TextOptions::default().set_stored()); let schema = schema_builder.build(); { - let mut store_writer = StoreWriter::new(writer, compressor, blocksize); + let mut store_writer = StoreWriter::new(writer, compressor, blocksize).unwrap(); for i in 0..num_docs { let mut doc = Document::default(); doc.add_field_value(field_body, LOREM.to_string()); diff --git a/src/store/writer.rs b/src/store/writer.rs index a351d0fcbc..3327dd1e97 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,4 +1,6 @@ use std::io::{self, Write}; +use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::thread::{self, JoinHandle}; use common::{BinarySerializable, CountingWriter, VInt}; @@ -21,12 +23,19 @@ use crate::DocId; pub struct StoreWriter { compressor: Compressor, block_size: usize, - doc: DocId, - first_doc_in_block: DocId, - offset_index_writer: SkipIndexBuilder, - writer: CountingWriter, + num_docs_in_current_block: DocId, intermediary_buffer: Vec, current_block: Vec, + + // the channel to send data to the compressor thread. + compressor_sender: SyncSender, + // the handle to check for errors on the thread + compressor_thread_handle: JoinHandle>, +} + +enum BlockCompressorMessage { + AddBlock(DocumentBlock), + Stack(StoreReader), } impl StoreWriter { @@ -34,17 +43,43 @@ impl StoreWriter { /// /// The store writer will writes blocks on disc as /// document are added. - pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { - StoreWriter { + pub fn new( + writer: WritePtr, + compressor: Compressor, + block_size: usize, + ) -> io::Result { + let thread_builder = thread::Builder::new().name("docstore-compressor-thread".to_string()); + + // Channel to send uncompressed data to compressor channel + let (block_sender, block_receiver): ( + SyncSender, + Receiver, + ) = sync_channel(3); + let thread_join_handle = thread_builder.spawn(move || { + let mut block_compressor = BlockCompressor::new(compressor, writer); + while let Ok(packet) = block_receiver.recv() { + match packet { + BlockCompressorMessage::AddBlock(block) => { + block_compressor.compress_block_and_write(block)?; + } + BlockCompressorMessage::Stack(store_reader) => { + block_compressor.stack(store_reader)?; + } + } + } + block_compressor.close()?; + Ok(()) + })?; + + Ok(StoreWriter { compressor, block_size, - doc: 0, - first_doc_in_block: 0, - offset_index_writer: SkipIndexBuilder::new(), - writer: CountingWriter::wrap(writer), + num_docs_in_current_block: 0, intermediary_buffer: Vec::new(), current_block: Vec::new(), - } + compressor_sender: block_sender, + compressor_thread_handle: thread_join_handle, + }) } pub(crate) fn compressor(&self) -> Compressor { @@ -56,18 +91,30 @@ impl StoreWriter { self.intermediary_buffer.capacity() + self.current_block.capacity() } - /// Store bytes of a serialized document. - /// - /// The document id is implicitely the current number - /// of documents. - pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { - let doc_num_bytes = serialized_document.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; - self.current_block.write_all(serialized_document)?; - self.doc += 1; + /// Checks if the current block is full, and if so, compresses and flushes it. + fn check_flush_block(&mut self) -> io::Result<()> { if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; + self.send_current_block_to_compressor()?; + } + Ok(()) + } + + /// Flushes current uncompressed block and sends to compressor. + fn send_current_block_to_compressor(&mut self) -> io::Result<()> { + // We don't do anything if the current block is empty to begin with. + if self.current_block.is_empty() { + return Ok(()); } + let block = DocumentBlock { + data: self.current_block.to_owned(), + num_docs_in_block: self.num_docs_in_current_block, + }; + self.current_block.clear(); + self.num_docs_in_current_block = 0; + self.compressor_sender + .send(BlockCompressorMessage::AddBlock(block)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + Ok(()) } @@ -82,13 +129,24 @@ impl StoreWriter { // intermediary_buffer due to the borrow checker // a new buffer costs ~1% indexing performance let doc_num_bytes = self.intermediary_buffer.len(); - VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; + VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); self.current_block .write_all(&self.intermediary_buffer[..])?; - self.doc += 1; - if self.current_block.len() > self.block_size { - self.write_and_compress_block()?; - } + self.num_docs_in_current_block += 1; + self.check_flush_block()?; + Ok(()) + } + + /// Store bytes of a serialized document. + /// + /// The document id is implicitely the current number + /// of documents. + pub fn store_bytes(&mut self, serialized_document: &[u8]) -> io::Result<()> { + let doc_num_bytes = serialized_document.len(); + VInt(doc_num_bytes as u64).serialize_into_vec(&mut self.current_block); + self.current_block.extend_from_slice(serialized_document); + self.num_docs_in_current_block += 1; + self.check_flush_block()?; Ok(()) } @@ -96,65 +154,109 @@ impl StoreWriter { /// This method is an optimization compared to iterating over the documents /// in the store and adding them one by one, as the store's data will /// not be decompressed and then recompressed. - pub fn stack(&mut self, store_reader: &StoreReader) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; - } - assert_eq!(self.first_doc_in_block, self.doc); - let doc_shift = self.doc; - let start_shift = self.writer.written_bytes() as usize; + pub fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + // We flush the current block first before stacking + self.send_current_block_to_compressor()?; + self.compressor_sender + .send(BlockCompressorMessage::Stack(store_reader)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - // just bulk write all of the block of the given reader. - self.writer - .write_all(store_reader.block_data()?.as_slice())?; + Ok(()) + } + + /// Finalized the store writer. + /// + /// Compress the last unfinished block if any, + /// and serializes the skip list index on disc. + pub fn close(mut self) -> io::Result<()> { + self.send_current_block_to_compressor()?; + drop(self.compressor_sender); + + self.compressor_thread_handle + .join() + .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))??; - // concatenate the index of the `store_reader`, after translating - // its start doc id and its start file offset. - for mut checkpoint in store_reader.block_checkpoints() { - checkpoint.doc_range.start += doc_shift; - checkpoint.doc_range.end += doc_shift; - checkpoint.byte_range.start += start_shift; - checkpoint.byte_range.end += start_shift; - self.register_checkpoint(checkpoint); - } Ok(()) } +} - fn register_checkpoint(&mut self, checkpoint: Checkpoint) { - self.offset_index_writer.insert(checkpoint.clone()); - self.first_doc_in_block = checkpoint.doc_range.end; - self.doc = checkpoint.doc_range.end; +/// BlockCompressor is separated from StoreWriter, to be run in an own thread +pub struct BlockCompressor { + compressor: Compressor, + first_doc_in_block: DocId, + offset_index_writer: SkipIndexBuilder, + intermediary_buffer: Vec, + writer: CountingWriter, +} + +struct DocumentBlock { + data: Vec, + num_docs_in_block: DocId, +} + +impl BlockCompressor { + fn new(compressor: Compressor, writer: WritePtr) -> Self { + Self { + compressor, + first_doc_in_block: 0, + offset_index_writer: SkipIndexBuilder::new(), + intermediary_buffer: Vec::new(), + writer: CountingWriter::wrap(writer), + } } - fn write_and_compress_block(&mut self) -> io::Result<()> { - assert!(self.doc > 0); + fn compress_block_and_write(&mut self, block: DocumentBlock) -> io::Result<()> { + assert!(block.num_docs_in_block > 0); self.intermediary_buffer.clear(); self.compressor - .compress_into(&self.current_block[..], &mut self.intermediary_buffer)?; + .compress_into(&block.data[..], &mut self.intermediary_buffer)?; + let start_offset = self.writer.written_bytes() as usize; self.writer.write_all(&self.intermediary_buffer)?; let end_offset = self.writer.written_bytes() as usize; - let end_doc = self.doc; + self.register_checkpoint(Checkpoint { - doc_range: self.first_doc_in_block..end_doc, + doc_range: self.first_doc_in_block..self.first_doc_in_block + block.num_docs_in_block, byte_range: start_offset..end_offset, }); - self.current_block.clear(); Ok(()) } - /// Finalized the store writer. - /// - /// Compress the last unfinished block if any, - /// and serializes the skip list index on disc. - pub fn close(mut self) -> io::Result<()> { - if !self.current_block.is_empty() { - self.write_and_compress_block()?; + fn register_checkpoint(&mut self, checkpoint: Checkpoint) { + self.offset_index_writer.insert(checkpoint.clone()); + self.first_doc_in_block = checkpoint.doc_range.end; + } + + /// Stacks a store reader on top of the documents written so far. + /// This method is an optimization compared to iterating over the documents + /// in the store and adding them one by one, as the store's data will + /// not be decompressed and then recompressed. + fn stack(&mut self, store_reader: StoreReader) -> io::Result<()> { + let doc_shift = self.first_doc_in_block; + let start_shift = self.writer.written_bytes() as usize; + + // just bulk write all of the block of the given reader. + self.writer + .write_all(store_reader.block_data()?.as_slice())?; + + // concatenate the index of the `store_reader`, after translating + // its start doc id and its start file offset. + for mut checkpoint in store_reader.block_checkpoints() { + checkpoint.doc_range.start += doc_shift; + checkpoint.doc_range.end += doc_shift; + checkpoint.byte_range.start += start_shift; + checkpoint.byte_range.end += start_shift; + self.register_checkpoint(checkpoint); } + Ok(()) + } + fn close(mut self) -> io::Result<()> { let header_offset: u64 = self.writer.written_bytes() as u64; - let footer = DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); - self.offset_index_writer.write(&mut self.writer)?; - footer.serialize(&mut self.writer)?; + let docstore_footer = + DocStoreFooter::new(header_offset, Decompressor::from(self.compressor)); + + self.offset_index_writer.serialize_into(&mut self.writer)?; + docstore_footer.serialize(&mut self.writer)?; self.writer.terminate() } }