Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify OrderPreservingInterner allocation strategy ~97% faster (#2677) #2827

Merged
merged 1 commit into from Oct 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
234 changes: 76 additions & 158 deletions arrow/src/row/interner.rs
Expand Up @@ -17,7 +17,6 @@

use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use std::cmp::Ordering;
use std::num::NonZeroU32;
use std::ops::Index;

Expand Down Expand Up @@ -134,27 +133,24 @@ impl OrderPreservingInterner {
/// returning `None` if it cannot be found
pub fn lookup(&self, normalized_key: &[u8]) -> Option<Interned> {
let len = normalized_key.len();
if len <= 1 {
return None;
}

let mut current_slot: Option<&Slot> = None;
let mut bucket = self.bucket.as_ref();
if len > 2 {
for v in normalized_key.iter().take(len - 2) {
let slot_idx = v.checked_sub(1)?;
current_slot = Some(match current_slot {
None => &self.bucket.slots[slot_idx as usize],
Some(b) => &b.child.as_ref()?.slots[slot_idx as usize],
});
if *v == 255 {
bucket = bucket.next.as_ref()?;
} else {
let bucket_idx = v.checked_sub(1)?;
bucket = bucket.slots.get(bucket_idx as usize)?.child.as_ref()?;
}
}
}

if len > 1 {
let slot_idx = normalized_key[len - 2].checked_sub(2)?;
current_slot = Some(match current_slot {
None => &self.bucket.slots[slot_idx as usize],
Some(b) => &b.child.as_ref()?.slots[slot_idx as usize],
});
}

current_slot.as_ref()?.value
let slot_idx = normalized_key[len - 2].checked_sub(2)?;
Some(bucket.slots.get(slot_idx as usize)?.value)
}

/// Returns the interned value for a given [`Interned`]
Expand Down Expand Up @@ -216,9 +212,9 @@ impl Index<Interned> for InternBuffer {
///
/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing
/// the next byte in the generated normalized key
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
struct Slot {
value: Option<Interned>,
value: Interned,
/// Child values less than `self.value` if any
child: Option<Box<Bucket>>,
}
Expand All @@ -230,180 +226,102 @@ struct Slot {
/// * Contain no `0` bytes other than the null terminator
/// * Compare lexicographically in the same manner as the encoded `data`
///
/// The data structure consists of 255 slots, each of which can store a value.
/// The data structure consists of 254 slots, each of which can store a value.
/// Additionally each slot may contain a child bucket, containing values smaller
/// than the value within the slot
///
/// # Allocation Strategy
///
/// To find the insertion point within a Bucket we perform a binary search of the slots, but
/// capping the search range at 4. Visualizing this as a search tree, the root would have 64
/// children, with subsequent non-leaf nodes each containing two children.
///
/// The insertion point is the first empty slot we encounter, otherwise it is the first slot
/// that contains a value greater than the value being inserted
///
/// For example, initially all slots are empty
///
/// ```ignore
/// 0:
/// 1:
/// .
/// .
/// 254:
/// ```
///
/// Insert `1000`
///
/// ```ignore
/// 0:
/// 1:
/// 2:
/// 3: 1000 <- 1. slot is empty, insert here
/// 4:
/// .
/// .
/// 254:
/// ```
///
/// Insert `500`
///
/// ```ignore
/// 0:
/// 1: 500 <- 2. slot is empty, insert here
/// 2:
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
/// than the value within the slot.
///
/// Insert `600`
/// Each bucket also may contain a child bucket, containing values greater than
/// all values in the current bucket
///
/// ```ignore
/// 0:
/// 1: 500 <- 2. compare against slot value
/// 2: 600 <- 3. slot is empty, insert here
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
/// # Allocation Strategy
///
/// Insert `400`
/// The contiguous slice of slots containing values is searched to find the insertion
/// point for the new value, according to the sort order.
///
/// ```ignore
/// 0: 400 <- 3. slot is empty, insert here
/// 1: 500 <- 2. compare against slot value
/// 2: 600
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
/// If the insertion position exceeds 254, the number of slots, the value is inserted
/// into the child bucket of the current bucket.
///
/// Insert `700`
/// If the insertion position already contains a value, the value is inserted into the
/// child bucket of that slot.
///
/// ```ignore
/// 0: 400
/// 1: 500 <- 2. compare against slot value
/// 2: 600 <- 3. slot is occupied and end of search
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
///
/// In this case we reach the end of our search and need to insert a value between
/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat
/// the process for that bucket.
/// If the slot is not occupied, the value is inserted into that slot.
///
/// The final key will consists of the slot indexes visited incremented by 1,
/// The final key consists of the slot indexes visited incremented by 1,
/// with the final value incremented by 2, followed by a null terminator.
///
/// So in the above example we would have
/// Consider the case of the integers `[8, 6, 5, 7]` inserted in that order
///
/// ```ignore
/// 400: &[2, 0]
/// 500: &[3, 0]
/// 600: &[4, 0]
/// 700: &[4, 5, 0]
/// 1000: &[5, 0]
/// 8: &[2, 0]
/// 6: &[1, 2, 0]
/// 5: &[1, 1, 2, 0]
/// 7: &[1, 3, 0]
/// ```
///
/// Note: this allocation strategy is optimised for interning values in sorted order
///
#[derive(Debug, Clone)]
struct Bucket {
slots: Box<[Slot]>,
slots: Vec<Slot>,
/// Bucket containing values larger than all of `slots`
next: Option<Box<Bucket>>,
}

impl Default for Bucket {
fn default() -> Self {
let slots = (0..255).map(|_| Slot::default()).collect::<Vec<_>>().into();
Self { slots }
Self {
slots: Vec::with_capacity(254),
next: None,
}
}
}

impl Bucket {
/// Perform a skewed binary search to find the first slot that is empty or less
///
/// Returns `Ok(idx)` if an exact match is found, otherwise returns `Err(idx)`
/// containing the slot index to insert at
fn insert_pos(&self, values_buf: &InternBuffer, data: &[u8]) -> Result<usize, usize> {
let mut size = self.slots.len() - 1;
let mut left = 0;
let mut right = size;
while left < right {
// Skew binary search to leave gaps of at most 3 elements
let mid = left + (size / 2).min(3);

let slot = &self.slots[mid];
let val = match slot.value {
Some(val) => val,
None => return Err(mid),
};

let cmp = values_buf[val].cmp(data);
if cmp == Ordering::Less {
left = mid + 1;
} else if cmp == Ordering::Greater {
right = mid;
} else {
return Ok(mid);
}

size = right - left;
}
Err(left)
}

/// Insert `data` into this bucket or one of its children, appending the
/// normalized key to `out` as it is constructed
///
/// # Panics
///
/// Panics if the value already exists
fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec<u8>) {
match self.insert_pos(values_buf, data) {
Ok(_) => unreachable!("value already exists"),
Err(idx) => {
let slot = &mut self.slots[idx];
// Cannot insert a value into slot 254 as would overflow byte, but also
// would prevent inserting any larger values, as the child bucket can
// only contain values less than the slot
if idx != 254 && slot.value.is_none() {
out.push(idx as u8 + 2);
slot.value = Some(values_buf.insert(data))
let slots_len = self.slots.len() as u8;
// We optimise the case of inserting a value directly after those already inserted
// as [`OrderPreservingInterner::intern`] sorts values prior to interning them
match self.slots.last() {
Some(slot) => {
if &values_buf[slot.value] < data {
if slots_len == 254 {
out.push(255);
self.next
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there coverage of this branch? I think it happens when data is inserted in backwards sorted order, right? Maybe we can add at test to ensure it is covered?

Copy link
Contributor Author

@tustvold tustvold Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is hit when the current bucket is "full" and it is trying to insert a value that is greater than the last value in the bucket. This therefore occurs when interning more than 255 values in order, and is therefore hit by test_interner() in particular

let mut values: Vec<_> = (0_u64..2000).collect();
test_intern_values(&values);

I have confirmed this with a debugger

.get_or_insert_with(Default::default)
.insert(values_buf, data, out)
} else {
out.push(slots_len + 2);
let value = values_buf.insert(data);
self.slots.push(Slot { value, child: None });
}
} else {
out.push(idx as u8 + 1);
slot.child
.get_or_insert_with(Default::default)
.insert(values_buf, data, out);
// Find insertion point
match self
.slots
.binary_search_by(|slot| values_buf[slot.value].cmp(data))
{
Ok(_) => unreachable!("value already exists"),
Err(idx) => {
out.push(idx as u8 + 1);
self.slots[idx]
.child
.get_or_insert_with(Default::default)
.insert(values_buf, data, out)
}
}
}
}
None => {
out.push(2);
let value = values_buf.insert(data);
self.slots.push(Slot { value, child: None })
}
}
}
}
Expand Down