diff --git a/CHANGELOG.md b/CHANGELOG.md index 696c09ebf8..db85b14e1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Tantivy 0.17 - Bugfix that could in theory impact durability in theory on some filesystems [#1224](https://github.com/quickwit-inc/tantivy/issues/1224) - Reduce the number of fsync calls [#1225](https://github.com/quickwit-inc/tantivy/issues/1225) - Schema now offers not indexing fieldnorms (@lpouget) [#922](https://github.com/quickwit-inc/tantivy/issues/922) +- LogMergePolicy now triggers merges if the ratio of deleted documents reaches a threshold (@shikhar) [#115](https://github.com/quickwit-inc/tantivy/issues/115) Tantivy 0.16.2 ================================ diff --git a/src/core/index_meta.rs b/src/core/index_meta.rs index 9fa7731e41..6ae8b6702e 100644 --- a/src/core/index_meta.rs +++ b/src/core/index_meta.rs @@ -189,6 +189,10 @@ impl SegmentMeta { #[doc(hidden)] pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta { + assert!( + num_deleted_docs <= self.max_doc(), + "There cannot be more deleted docs than there are docs." + ); let delete_meta = DeleteMeta { num_deleted_docs, opstamp, diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index fdec9ecd56..74b70b27a0 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -974,7 +974,7 @@ mod tests { assert_eq!( format!("{:?}", index_writer.get_merge_policy()), "LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, min_layer_size: 10000, \ - level_log_size: 0.75 }" + level_log_size: 0.75, del_docs_ratio_before_merge: 1.0 }" ); let merge_policy = Box::new(NoMergePolicy::default()); index_writer.set_merge_policy(merge_policy); diff --git a/src/indexer/log_merge_policy.rs b/src/indexer/log_merge_policy.rs index c47296005d..913834a555 100644 --- a/src/indexer/log_merge_policy.rs +++ b/src/indexer/log_merge_policy.rs @@ -2,12 +2,15 @@ use super::merge_policy::{MergeCandidate, MergePolicy}; use crate::core::SegmentMeta; use itertools::Itertools; use std::cmp; -use std::f64; const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75; const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000; const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8; const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000; +// The default value of 1 means that deletes are not taken in account when +// identifying merge candidates. This is not a very sensible default: it was +// set like that for backward compatibility and might change in the near future. +const DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE: f32 = 1.0f32; /// `LogMergePolicy` tries to merge segments that have a similar number of /// documents. @@ -17,6 +20,7 @@ pub struct LogMergePolicy { max_docs_before_merge: usize, min_layer_size: u32, level_log_size: f64, + del_docs_ratio_before_merge: f32, } impl LogMergePolicy { @@ -52,19 +56,49 @@ impl LogMergePolicy { pub fn set_level_log_size(&mut self, level_log_size: f64) { self.level_log_size = level_log_size; } + + /// Set the ratio of deleted documents in a segment to tolerate. + /// + /// If it is exceeded by any segment at a log level, a merge + /// will be triggered for that level. + /// + /// If there is a single segment at a level, we effectively end up expunging + /// deleted documents from it. + /// + /// # Panics + /// + /// Panics if del_docs_ratio_before_merge is not within (0..1]. + pub fn set_del_docs_ratio_before_merge(&mut self, del_docs_ratio_before_merge: f32) { + assert!(del_docs_ratio_before_merge <= 1.0f32); + assert!(del_docs_ratio_before_merge > 0f32); + self.del_docs_ratio_before_merge = del_docs_ratio_before_merge; + } + + fn has_segment_above_deletes_threshold(&self, level: &[&SegmentMeta]) -> bool { + level + .iter() + .any(|segment| deletes_ratio(segment) > self.del_docs_ratio_before_merge) + } +} + +fn deletes_ratio(segment: &SegmentMeta) -> f32 { + if segment.max_doc() == 0 { + return 0f32; + } + segment.num_deleted_docs() as f32 / segment.max_doc() as f32 } impl MergePolicy for LogMergePolicy { fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec { - let mut size_sorted_segments = segments + let size_sorted_segments = segments .iter() - .filter(|segment_meta| segment_meta.num_docs() <= (self.max_docs_before_merge as u32)) + .filter(|seg| seg.num_docs() <= (self.max_docs_before_merge as u32)) + .sorted_by_key(|seg| std::cmp::Reverse(seg.max_doc())) .collect::>(); - if size_sorted_segments.len() <= 1 { + if size_sorted_segments.is_empty() { return vec![]; } - size_sorted_segments.sort_by_key(|seg| std::cmp::Reverse(seg.num_docs())); let mut current_max_log_size = f64::MAX; let mut levels = vec![]; @@ -82,7 +116,10 @@ impl MergePolicy for LogMergePolicy { levels .iter() - .filter(|level| level.len() >= self.min_num_segments) + .filter(|level| { + level.len() >= self.min_num_segments + || self.has_segment_above_deletes_threshold(level) + }) .map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect())) .collect() } @@ -95,6 +132,7 @@ impl Default for LogMergePolicy { max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE, min_layer_size: DEFAULT_MIN_LAYER_SIZE, level_log_size: DEFAULT_LEVEL_LOG_SIZE, + del_docs_ratio_before_merge: DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE, } } } @@ -288,4 +326,49 @@ mod tests { assert_eq!(result_list[0].0[1], test_input[4].id()); assert_eq!(result_list[0].0[2], test_input[5].id()); } + + #[test] + fn test_merge_single_segment_with_deletes_below_threshold() { + let mut test_merge_policy = test_merge_policy(); + test_merge_policy.set_del_docs_ratio_before_merge(0.25f32); + let test_input = vec![create_random_segment_meta(40_000).with_delete_meta(10_000, 1)]; + let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input); + assert!(merge_candidates.is_empty()); + } + + #[test] + fn test_merge_single_segment_with_deletes_above_threshold() { + let mut test_merge_policy = test_merge_policy(); + test_merge_policy.set_del_docs_ratio_before_merge(0.25f32); + let test_input = vec![create_random_segment_meta(40_000).with_delete_meta(10_001, 1)]; + let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input); + assert_eq!(merge_candidates.len(), 1); + } + + #[test] + fn test_merge_segments_with_deletes_above_threshold_all_in_level() { + let mut test_merge_policy = test_merge_policy(); + test_merge_policy.set_del_docs_ratio_before_merge(0.25f32); + let test_input = vec![ + create_random_segment_meta(40_000).with_delete_meta(10_001, 1), + create_random_segment_meta(40_000), + ]; + let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input); + assert_eq!(merge_candidates.len(), 1); + assert_eq!(merge_candidates[0].0.len(), 2); + } + + #[test] + fn test_merge_segments_with_deletes_above_threshold_different_level_not_involved() { + let mut test_merge_policy = test_merge_policy(); + test_merge_policy.set_del_docs_ratio_before_merge(0.25f32); + let test_input = vec![ + create_random_segment_meta(100), + create_random_segment_meta(40_000).with_delete_meta(10_001, 1), + ]; + let merge_candidates = test_merge_policy.compute_merge_candidates(&test_input); + assert_eq!(merge_candidates.len(), 1); + assert_eq!(merge_candidates[0].0.len(), 1); + assert_eq!(merge_candidates[0].0[0], test_input[1].id()); + } }