Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use hashbrown::RawTable API #2765

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ name = "builder"
harness = false
required-features = ["test_utils"]

[[bench]]
name = "string_dictionary_builder"
harness = false
required-features = ["test_utils"]

[[bench]]
name = "buffer_bit_ops"
harness = false
Expand Down
55 changes: 27 additions & 28 deletions arrow/src/array/builder/string_dictionary_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use crate::array::{
};
use crate::datatypes::{ArrowDictionaryKeyType, ArrowNativeType, DataType};
use crate::error::{ArrowError, Result};
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use hashbrown::raw::RawTable;
use std::any::Any;
use std::sync::Arc;

Expand Down Expand Up @@ -65,23 +64,32 @@ use std::sync::Arc;
/// assert_eq!(ava.value(1), "def");
///
/// ```
#[derive(Debug)]
pub struct StringDictionaryBuilder<K>
where
K: ArrowDictionaryKeyType,
{
state: ahash::RandomState,
/// Used to provide a lookup from string value to key type
///
/// Note: K's hash implementation is not used, instead the raw entry
/// API is used to store keys w.r.t the hash of the strings themselves
///
dedup: HashMap<K::Native, (), ()>,
/// Stores the dictionary keys for a given string value
dedup: RawTable<K::Native>,

keys_builder: PrimitiveBuilder<K>,
values_builder: StringBuilder,
}

impl<K> std::fmt::Debug for StringDictionaryBuilder<K>
where
K: ArrowDictionaryKeyType + std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StringDictionaryBuilder")
.field("keys_builder", &self.keys_builder)
.field("values_builder", &self.values_builder)
.finish()
}
}

impl<K> Default for StringDictionaryBuilder<K>
where
K: ArrowDictionaryKeyType,
Expand All @@ -101,7 +109,7 @@ where
let values_builder = StringBuilder::new();
Self {
state: Default::default(),
dedup: HashMap::with_capacity_and_hasher(keys_builder.capacity(), ()),
dedup: RawTable::with_capacity(keys_builder.capacity()),
keys_builder,
values_builder,
}
Expand Down Expand Up @@ -156,7 +164,7 @@ where
let state = ahash::RandomState::default();
let dict_len = dictionary_values.len();

let mut dedup = HashMap::with_capacity_and_hasher(dict_len, ());
let mut dedup = RawTable::with_capacity(dict_len);

let values_len = dictionary_values.value_data().len();
let mut values_builder = StringBuilder::with_capacity(dict_len, values_len);
Expand All @@ -169,16 +177,10 @@ where
let key = K::Native::from_usize(idx)
.ok_or(ArrowError::DictionaryKeyOverflowError)?;

let entry =
dedup.raw_entry_mut().from_hash(hash, |key: &K::Native| {
value.as_bytes() == get_bytes(&values_builder, key)
});

if let RawEntryMut::Vacant(v) = entry {
v.insert_with_hasher(hash, key, (), |key| {
state.hash_one(get_bytes(&values_builder, key))
});
}
// Note: this will insert duplicates if dictionary_values contains duplicates
dedup.insert(hash, key, |key| {
state.hash_one(get_bytes(&values_builder, key))
});

values_builder.append_value(value);
}
Expand Down Expand Up @@ -248,22 +250,19 @@ where

let entry = self
.dedup
.raw_entry_mut()
.from_hash(hash, |key| value.as_bytes() == get_bytes(storage, key));
.get(hash, |key| value.as_bytes() == get_bytes(storage, key));

let key = match entry {
RawEntryMut::Occupied(entry) => *entry.into_key(),
RawEntryMut::Vacant(entry) => {
Some(k) => *k,
None => {
let index = storage.len();
storage.append_value(value);
let key = K::Native::from_usize(index)
.ok_or(ArrowError::DictionaryKeyOverflowError)?;

*entry
.insert_with_hasher(hash, key, (), |key| {
state.hash_one(get_bytes(storage, key))
})
.0
self.dedup
.insert(hash, key, |key| state.hash_one(get_bytes(storage, key)));
key
}
};
self.keys_builder.append_value(key);
Expand Down
53 changes: 25 additions & 28 deletions arrow/src/row/interner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use hashbrown::raw::RawTable;
use std::cmp::Ordering;
use std::num::NonZeroU32;
use std::ops::Index;
Expand All @@ -27,7 +26,7 @@ pub struct Interned(NonZeroU32); // We use NonZeroU32 so that `Option<Interned>`

/// A byte array interner that generates normalized keys that are sorted with respect
/// to the interned values, e.g. `inter(a) < intern(b) => a < b`
#[derive(Debug, Default)]
#[derive(Default)]
pub struct OrderPreservingInterner {
/// Provides a lookup from [`Interned`] to the normalized key
keys: InternBuffer,
Expand All @@ -38,7 +37,17 @@ pub struct OrderPreservingInterner {

// A hash table used to perform faster re-keying, and detect duplicates
hasher: ahash::RandomState,
lookup: HashMap<Interned, (), ()>,
lookup: RawTable<Interned>,
}

impl std::fmt::Debug for OrderPreservingInterner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OrderPreservingInterner")
.field("keys", &self.keys)
.field("values", &self.values)
.field("bucket", &self.bucket)
.finish()
}
}

impl OrderPreservingInterner {
Expand Down Expand Up @@ -69,14 +78,10 @@ impl OrderPreservingInterner {

let v = value.as_ref();
let hash = self.hasher.hash_one(v);
let entry = self
.lookup
.raw_entry_mut()
.from_hash(hash, |a| &self.values[*a] == v);

match entry {
RawEntryMut::Occupied(o) => out.push(Some(*o.key())),
RawEntryMut::Vacant(_) => {

match self.lookup.get(hash, |a| &self.values[*a] == v) {
Some(key) => out.push(Some(*key)),
None => {
// Push placeholder
out.push(None);
to_intern_len += v.len();
Expand All @@ -93,29 +98,21 @@ impl OrderPreservingInterner {
self.values.values.reserve(to_intern_len);

for (idx, hash, value) in to_intern {
let val = value.as_ref();

let entry = self
.lookup
.raw_entry_mut()
.from_hash(hash, |a| &self.values[*a] == val);

match entry {
RawEntryMut::Occupied(o) => {
out[idx] = Some(*o.key());
let v = value.as_ref();
match self.lookup.get(hash, |a| &self.values[*a] == v) {
Some(key) => {
out[idx] = Some(*key);
}
RawEntryMut::Vacant(v) => {
let val = value.as_ref();
None => {
self.bucket
.insert(&mut self.values, val, &mut self.keys.values);
.insert(&mut self.values, v, &mut self.keys.values);
self.keys.values.push(0);
let interned = self.keys.append();

let hasher = &mut self.hasher;
let values = &self.values;
v.insert_with_hasher(hash, interned, (), |key| {
hasher.hash_one(&values[*key])
});
self.lookup
.insert(hash, interned, |key| hasher.hash_one(&values[*key]));
out[idx] = Some(interned);
}
}
Expand Down
42 changes: 20 additions & 22 deletions parquet/src/util/interner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
// under the License.

use crate::data_type::AsBytes;
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use hashbrown::raw::RawTable;

const DEFAULT_DEDUP_CAPACITY: usize = 4096;

Expand All @@ -35,26 +34,28 @@ pub trait Storage {
}

/// A generic value interner supporting various different [`Storage`]
#[derive(Debug, Default)]
#[derive(Default)]
pub struct Interner<S: Storage> {
state: ahash::RandomState,

/// Used to provide a lookup from value to unique value
///
/// Note: `S::Key`'s hash implementation is not used, instead the raw entry
/// API is used to store keys w.r.t the hash of the strings themselves
///
dedup: HashMap<S::Key, (), ()>,
/// Used to provide a lookup from `S::Value` to `S::Key`
dedup: RawTable<S::Key>,

storage: S,
}

impl<S: Storage + std::fmt::Debug> std::fmt::Debug for Interner<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Interner").field(&self.storage).finish()
}
}

impl<S: Storage> Interner<S> {
/// Create a new `Interner` with the provided storage
pub fn new(storage: S) -> Self {
Self {
state: Default::default(),
dedup: HashMap::with_capacity_and_hasher(DEFAULT_DEDUP_CAPACITY, ()),
dedup: RawTable::with_capacity(DEFAULT_DEDUP_CAPACITY),
storage,
}
}
Expand All @@ -63,21 +64,18 @@ impl<S: Storage> Interner<S> {
pub fn intern(&mut self, value: &S::Value) -> S::Key {
let hash = self.state.hash_one(value.as_bytes());

let entry = self
let maybe_key = self
.dedup
.raw_entry_mut()
.from_hash(hash, |index| value == self.storage.get(*index));
.get(hash, |index| value == self.storage.get(*index));

match entry {
RawEntryMut::Occupied(entry) => *entry.into_key(),
RawEntryMut::Vacant(entry) => {
match maybe_key {
Some(key) => *key,
None => {
let key = self.storage.push(value);

*entry
.insert_with_hasher(hash, key, (), |key| {
self.state.hash_one(self.storage.get(*key).as_bytes())
})
.0
self.dedup.insert(hash, key, |key| {
self.state.hash_one(self.storage.get(*key).as_bytes())
});
key
}
}
}
Expand Down