Skip to content

Commit

Permalink
Faster StringDictionaryBuilder (#1851) (#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jun 29, 2022
1 parent 8bb494e commit 903b24a
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 43 deletions.
2 changes: 2 additions & 0 deletions arrow/Cargo.toml
Expand Up @@ -38,13 +38,15 @@ path = "src/lib.rs"
bench = false

[dependencies]
ahash = { version = "0.7", default-features = false }
serde = { version = "1.0", default-features = false }
serde_derive = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false, features = ["preserve_order"] }
indexmap = { version = "1.9", default-features = false, features = ["std"] }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
num = { version = "0.4", default-features = false, features = ["std"] }
half = { version = "2.0", default-features = false }
hashbrown = { version = "0.12", default-features = false }
csv_crate = { version = "1.1", default-features = false, optional = true, package="csv" }
regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] }
lazy_static = { version = "1.4", default-features = false }
Expand Down
70 changes: 70 additions & 0 deletions arrow/benches/string_dictionary_builder.rs
@@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{Int32Builder, StringBuilder, StringDictionaryBuilder};
use criterion::{criterion_group, criterion_main, Criterion};
use rand::{thread_rng, Rng};

/// Note: this is best effort, not all keys are necessarily present or unique
fn build_strings(dict_size: usize, total_size: usize, key_len: usize) -> Vec<String> {
let mut rng = thread_rng();
let values: Vec<String> = (0..dict_size)
.map(|_| (0..key_len).map(|_| rng.gen::<char>()).collect())
.collect();

(0..total_size)
.map(|_| values[rng.gen_range(0..dict_size)].clone())
.collect()
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("string_dictionary_builder");

let mut do_bench = |dict_size: usize, total_size: usize, key_len: usize| {
group.bench_function(
format!(
"(dict_size:{}, len:{}, key_len: {})",
dict_size, total_size, key_len
),
|b| {
let strings = build_strings(dict_size, total_size, key_len);
b.iter(|| {
let keys = Int32Builder::new(strings.len());
let values = StringBuilder::new((key_len + 1) * dict_size);
let mut builder = StringDictionaryBuilder::new(keys, values);

for val in &strings {
builder.append(val).unwrap();
}

builder.finish();
})
},
);
};

do_bench(20, 1000, 5);
do_bench(100, 1000, 5);
do_bench(100, 1000, 10);
do_bench(100, 10000, 10);
do_bench(100, 10000, 100);

group.finish();
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
10 changes: 10 additions & 0 deletions arrow/src/array/builder/generic_list_builder.rs
Expand Up @@ -107,6 +107,11 @@ where
&mut self.values_builder
}

/// Returns the child array builder as an immutable reference
pub fn values_ref(&self) -> &T {
&self.values_builder
}

/// Finish the current variable-length list array slot
#[inline]
pub fn append(&mut self, is_valid: bool) -> Result<()> {
Expand Down Expand Up @@ -152,6 +157,11 @@ where

GenericListArray::<OffsetSize>::from(array_data)
}

/// Returns the current offsets buffer as a slice
pub fn offsets_slice(&self) -> &[OffsetSize] {
self.offsets_builder.as_slice()
}
}

#[cfg(test)]
Expand Down
10 changes: 10 additions & 0 deletions arrow/src/array/builder/generic_string_builder.rs
Expand Up @@ -87,6 +87,16 @@ impl<OffsetSize: OffsetSizeTrait> GenericStringBuilder<OffsetSize> {
pub fn finish(&mut self) -> GenericStringArray<OffsetSize> {
GenericStringArray::<OffsetSize>::from(self.builder.finish())
}

/// Returns the current values buffer as a slice
pub fn values_slice(&self) -> &[u8] {
self.builder.values_ref().values_slice()
}

/// Returns the current offsets buffer as a slice
pub fn offsets_slice(&self) -> &[OffsetSize] {
self.builder.offsets_slice()
}
}

impl<OffsetSize: OffsetSizeTrait> ArrayBuilder for GenericStringBuilder<OffsetSize> {
Expand Down
5 changes: 5 additions & 0 deletions arrow/src/array/builder/primitive_builder.rs
Expand Up @@ -230,6 +230,11 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
b.append_n(self.values_builder.len(), true);
self.bitmap_builder = Some(b);
}

/// Returns the current values buffer as a slice
pub fn values_slice(&self) -> &[T::Native] {
self.values_builder.as_slice()
}
}

#[cfg(test)]
Expand Down
141 changes: 98 additions & 43 deletions arrow/src/array/builder/string_dictionary_builder.rs
Expand Up @@ -15,21 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use super::PrimitiveBuilder;
use crate::array::{
Array, ArrayBuilder, ArrayRef, DictionaryArray, StringArray, StringBuilder,
};
use crate::datatypes::{ArrowDictionaryKeyType, ArrowNativeType};
use crate::error::{ArrowError, Result};
use hashbrown::hash_map::RawEntryMut;
use hashbrown::HashMap;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use crate::array::array::Array;
use crate::array::ArrayBuilder;
use crate::array::ArrayRef;
use crate::array::ArrowDictionaryKeyType;
use crate::array::DictionaryArray;
use crate::array::PrimitiveBuilder;
use crate::array::StringArray;
use crate::array::StringBuilder;
use crate::datatypes::ArrowNativeType;
use crate::error::{ArrowError, Result};

/// Array builder for `DictionaryArray` that stores Strings. For example to map a set of byte indices
/// to String values. Note that the use of a `HashMap` here will not scale to very large
/// arrays or result in an ordered dictionary.
Expand Down Expand Up @@ -76,9 +72,16 @@ pub struct StringDictionaryBuilder<K>
where
K: ArrowDictionaryKeyType,
{
state: ahash::RandomState,
/// Used to provide a lookup from string value to key type
///
/// Note: K's hash implementation is not used, instead the raw entry
/// API is used to store keys w.r.t the hash of the strings themselves
///
dedup: HashMap<K::Native, (), ()>,

keys_builder: PrimitiveBuilder<K>,
values_builder: StringBuilder,
map: HashMap<Box<[u8]>, K::Native>,
}

impl<K> StringDictionaryBuilder<K>
Expand All @@ -88,9 +91,10 @@ where
/// Creates a new `StringDictionaryBuilder` from a keys builder and a value builder.
pub fn new(keys_builder: PrimitiveBuilder<K>, values_builder: StringBuilder) -> Self {
Self {
state: Default::default(),
dedup: HashMap::with_capacity_and_hasher(keys_builder.capacity(), ()),
keys_builder,
values_builder,
map: HashMap::new(),
}
}

Expand Down Expand Up @@ -122,27 +126,44 @@ where
keys_builder: PrimitiveBuilder<K>,
dictionary_values: &StringArray,
) -> Result<Self> {
let state = ahash::RandomState::default();
let dict_len = dictionary_values.len();
let mut values_builder =
StringBuilder::with_capacity(dict_len, dictionary_values.value_data().len());
let mut map: HashMap<Box<[u8]>, K::Native> = HashMap::with_capacity(dict_len);
for i in 0..dict_len {
if dictionary_values.is_valid(i) {
let value = dictionary_values.value(i);
map.insert(
value.as_bytes().into(),
K::Native::from_usize(i)
.ok_or(ArrowError::DictionaryKeyOverflowError)?,
);
values_builder.append_value(value)?;
} else {
values_builder.append_null()?;

let mut dedup = HashMap::with_capacity_and_hasher(dict_len, ());

let values_len = dictionary_values.value_data().len();
let mut values_builder = StringBuilder::with_capacity(dict_len, values_len);

for (idx, maybe_value) in dictionary_values.iter().enumerate() {
match maybe_value {
Some(value) => {
let hash = compute_hash(&state, value.as_bytes());

let key = K::Native::from_usize(idx)
.ok_or(ArrowError::DictionaryKeyOverflowError)?;

let entry =
dedup.raw_entry_mut().from_hash(hash, |key: &K::Native| {
value.as_bytes() == get_bytes(&values_builder, key)
});

if let RawEntryMut::Vacant(v) = entry {
v.insert_with_hasher(hash, key, (), |key| {
compute_hash(&state, get_bytes(&values_builder, key))
});
}

values_builder.append_value(value)?;
}
None => values_builder.append_null()?,
}
}

Ok(Self {
state,
dedup,
keys_builder,
values_builder,
map,
})
}
}
Expand Down Expand Up @@ -190,19 +211,35 @@ where
/// if already present in the values array or a new index if the
/// value is appended to the values array.
pub fn append(&mut self, value: impl AsRef<str>) -> Result<K::Native> {
if let Some(&key) = self.map.get(value.as_ref().as_bytes()) {
// Append existing value.
self.keys_builder.append_value(key)?;
Ok(key)
} else {
// Append new value.
let key = K::Native::from_usize(self.values_builder.len())
.ok_or(ArrowError::DictionaryKeyOverflowError)?;
self.values_builder.append_value(value.as_ref())?;
self.keys_builder.append_value(key as K::Native)?;
self.map.insert(value.as_ref().as_bytes().into(), key);
Ok(key)
}
let value = value.as_ref();

let state = &self.state;
let storage = &mut self.values_builder;
let hash = compute_hash(state, value.as_bytes());

let entry = self
.dedup
.raw_entry_mut()
.from_hash(hash, |key| value.as_bytes() == get_bytes(storage, key));

let key = match entry {
RawEntryMut::Occupied(entry) => *entry.into_key(),
RawEntryMut::Vacant(entry) => {
let index = storage.len();
storage.append_value(value)?;
let key = K::Native::from_usize(index)
.ok_or(ArrowError::DictionaryKeyOverflowError)?;

*entry
.insert_with_hasher(hash, key, (), |key| {
compute_hash(state, get_bytes(storage, key))
})
.0
}
};
self.keys_builder.append_value(key)?;

Ok(key)
}

#[inline]
Expand All @@ -212,12 +249,30 @@ where

/// Builds the `DictionaryArray` and reset this builder.
pub fn finish(&mut self) -> DictionaryArray<K> {
self.map.clear();
self.dedup.clear();
let value_ref: ArrayRef = Arc::new(self.values_builder.finish());
self.keys_builder.finish_dict(value_ref)
}
}

fn compute_hash(hasher: &ahash::RandomState, value: &[u8]) -> u64 {
use std::hash::{BuildHasher, Hash, Hasher};
let mut state = hasher.build_hasher();
value.hash(&mut state);
state.finish()
}

fn get_bytes<'a, K: ArrowNativeType>(values: &'a StringBuilder, key: &K) -> &'a [u8] {
let offsets = values.offsets_slice();
let values = values.values_slice();

let idx = key.to_usize().unwrap();
let end_offset = offsets[idx + 1].to_usize().unwrap();
let start_offset = offsets[idx].to_usize().unwrap();

&values[start_offset..end_offset]
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 903b24a

Please sign in to comment.