Skip to content

Commit

Permalink
Simplify OrderPreservingInterner allocation strategy (apache#2677)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 5, 2022
1 parent 6866690 commit 4c4b993
Showing 1 changed file with 76 additions and 158 deletions.
234 changes: 76 additions & 158 deletions arrow/src/row/interner.rs
Expand Up @@ -17,7 +17,6 @@

use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use std::cmp::Ordering;
use std::num::NonZeroU32;
use std::ops::Index;

Expand Down Expand Up @@ -134,27 +133,24 @@ impl OrderPreservingInterner {
/// returning `None` if it cannot be found
pub fn lookup(&self, normalized_key: &[u8]) -> Option<Interned> {
let len = normalized_key.len();
if len <= 1 {
return None;
}

let mut current_slot: Option<&Slot> = None;
let mut bucket = self.bucket.as_ref();
if len > 2 {
for v in normalized_key.iter().take(len - 2) {
let slot_idx = v.checked_sub(1)?;
current_slot = Some(match current_slot {
None => &self.bucket.slots[slot_idx as usize],
Some(b) => &b.child.as_ref()?.slots[slot_idx as usize],
});
if *v == 255 {
bucket = bucket.next.as_ref()?;
} else {
let bucket_idx = v.checked_sub(1)?;
bucket = bucket.slots.get(bucket_idx as usize)?.child.as_ref()?;
}
}
}

if len > 1 {
let slot_idx = normalized_key[len - 2].checked_sub(2)?;
current_slot = Some(match current_slot {
None => &self.bucket.slots[slot_idx as usize],
Some(b) => &b.child.as_ref()?.slots[slot_idx as usize],
});
}

current_slot.as_ref()?.value
let slot_idx = normalized_key[len - 2].checked_sub(2)?;
Some(bucket.slots.get(slot_idx as usize)?.value)
}

/// Returns the interned value for a given [`Interned`]
Expand Down Expand Up @@ -216,9 +212,9 @@ impl Index<Interned> for InternBuffer {
///
/// It may contain a value, if not the first slot, and may contain a child [`Bucket`] representing
/// the next byte in the generated normalized key
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
struct Slot {
value: Option<Interned>,
value: Interned,
/// Child values less than `self.value` if any
child: Option<Box<Bucket>>,
}
Expand All @@ -230,180 +226,102 @@ struct Slot {
/// * Contain no `0` bytes other than the null terminator
/// * Compare lexicographically in the same manner as the encoded `data`
///
/// The data structure consists of 255 slots, each of which can store a value.
/// The data structure consists of 254 slots, each of which can store a value.
/// Additionally each slot may contain a child bucket, containing values smaller
/// than the value within the slot
///
/// # Allocation Strategy
///
/// To find the insertion point within a Bucket we perform a binary search of the slots, but
/// capping the search range at 4. Visualizing this as a search tree, the root would have 64
/// children, with subsequent non-leaf nodes each containing two children.
///
/// The insertion point is the first empty slot we encounter, otherwise it is the first slot
/// that contains a value greater than the value being inserted
///
/// For example, initially all slots are empty
///
/// ```ignore
/// 0:
/// 1:
/// .
/// .
/// 254:
/// ```
///
/// Insert `1000`
///
/// ```ignore
/// 0:
/// 1:
/// 2:
/// 3: 1000 <- 1. slot is empty, insert here
/// 4:
/// .
/// .
/// 254:
/// ```
///
/// Insert `500`
///
/// ```ignore
/// 0:
/// 1: 500 <- 2. slot is empty, insert here
/// 2:
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
/// than the value within the slot.
///
/// Insert `600`
/// Each bucket also may contain a child bucket, containing values greater than
/// all values in the current bucket
///
/// ```ignore
/// 0:
/// 1: 500 <- 2. compare against slot value
/// 2: 600 <- 3. slot is empty, insert here
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
/// # Allocation Strategy
///
/// Insert `400`
/// The contiguous slice of slots containing values is searched to find the insertion
/// point for the new value, according to the sort order.
///
/// ```ignore
/// 0: 400 <- 3. slot is empty, insert here
/// 1: 500 <- 2. compare against slot value
/// 2: 600
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
/// If the insertion position exceeds 254, the number of slots, the value is inserted
/// into the child bucket of the current bucket.
///
/// Insert `700`
/// If the insertion position already contains a value, the value is inserted into the
/// child bucket of that slot.
///
/// ```ignore
/// 0: 400
/// 1: 500 <- 2. compare against slot value
/// 2: 600 <- 3. slot is occupied and end of search
/// 3: 1000 <- 1. compare against slot value
/// 4.
/// .
/// .
/// 254:
/// ```
///
/// In this case we reach the end of our search and need to insert a value between
/// slots 2 and 3. To do this we create a new bucket under slot 3, and repeat
/// the process for that bucket.
/// If the slot is not occupied, the value is inserted into that slot.
///
/// The final key will consists of the slot indexes visited incremented by 1,
/// The final key consists of the slot indexes visited incremented by 1,
/// with the final value incremented by 2, followed by a null terminator.
///
/// So in the above example we would have
/// Consider the case of the integers `[8, 6, 5, 7]` inserted in that order
///
/// ```ignore
/// 400: &[2, 0]
/// 500: &[3, 0]
/// 600: &[4, 0]
/// 700: &[4, 5, 0]
/// 1000: &[5, 0]
/// 8: &[2, 0]
/// 6: &[1, 2, 0]
/// 5: &[1, 1, 2, 0]
/// 7: &[1, 3, 0]
/// ```
///
/// Note: this allocation strategy is optimised for interning values in sorted order
///
#[derive(Debug, Clone)]
struct Bucket {
slots: Box<[Slot]>,
slots: Vec<Slot>,
/// Bucket containing values larger than all of `slots`
next: Option<Box<Bucket>>,
}

impl Default for Bucket {
fn default() -> Self {
let slots = (0..255).map(|_| Slot::default()).collect::<Vec<_>>().into();
Self { slots }
Self {
slots: Vec::with_capacity(254),
next: None,
}
}
}

impl Bucket {
/// Perform a skewed binary search to find the first slot that is empty or less
///
/// Returns `Ok(idx)` if an exact match is found, otherwise returns `Err(idx)`
/// containing the slot index to insert at
fn insert_pos(&self, values_buf: &InternBuffer, data: &[u8]) -> Result<usize, usize> {
let mut size = self.slots.len() - 1;
let mut left = 0;
let mut right = size;
while left < right {
// Skew binary search to leave gaps of at most 3 elements
let mid = left + (size / 2).min(3);

let slot = &self.slots[mid];
let val = match slot.value {
Some(val) => val,
None => return Err(mid),
};

let cmp = values_buf[val].cmp(data);
if cmp == Ordering::Less {
left = mid + 1;
} else if cmp == Ordering::Greater {
right = mid;
} else {
return Ok(mid);
}

size = right - left;
}
Err(left)
}

/// Insert `data` into this bucket or one of its children, appending the
/// normalized key to `out` as it is constructed
///
/// # Panics
///
/// Panics if the value already exists
fn insert(&mut self, values_buf: &mut InternBuffer, data: &[u8], out: &mut Vec<u8>) {
match self.insert_pos(values_buf, data) {
Ok(_) => unreachable!("value already exists"),
Err(idx) => {
let slot = &mut self.slots[idx];
// Cannot insert a value into slot 254 as would overflow byte, but also
// would prevent inserting any larger values, as the child bucket can
// only contain values less than the slot
if idx != 254 && slot.value.is_none() {
out.push(idx as u8 + 2);
slot.value = Some(values_buf.insert(data))
let slots_len = self.slots.len() as u8;
// We optimise the case of inserting a value directly after those already inserted
// as [`OrderPreservingInterner::intern`] sorts values prior to interning them
match self.slots.last() {
Some(slot) => {
if &values_buf[slot.value] < data {
if slots_len == 254 {
out.push(255);
self.next
.get_or_insert_with(Default::default)
.insert(values_buf, data, out)
} else {
out.push(slots_len + 2);
let value = values_buf.insert(data);
self.slots.push(Slot { value, child: None });
}
} else {
out.push(idx as u8 + 1);
slot.child
.get_or_insert_with(Default::default)
.insert(values_buf, data, out);
// Find insertion point
match self
.slots
.binary_search_by(|slot| values_buf[slot.value].cmp(data))
{
Ok(_) => unreachable!("value already exists"),
Err(idx) => {
out.push(idx as u8 + 1);
self.slots[idx]
.child
.get_or_insert_with(Default::default)
.insert(values_buf, data, out)
}
}
}
}
None => {
out.push(2);
let value = values_buf.insert(data);
self.slots.push(Slot { value, child: None })
}
}
}
}
Expand Down

0 comments on commit 4c4b993

Please sign in to comment.