From 75f4945bbb852b2c49df3b7c4b8071579a4f381f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 13 Jun 2022 12:33:41 +0100 Subject: [PATCH] Faster StringDictionaryBuilder (#1851) --- arrow/Cargo.toml | 2 + arrow/benches/string_dictionary_builder.rs | 70 +++++++++ .../src/array/builder/generic_list_builder.rs | 10 ++ .../array/builder/generic_string_builder.rs | 10 ++ arrow/src/array/builder/primitive_builder.rs | 5 + .../builder/string_dictionary_builder.rs | 141 ++++++++++++------ 6 files changed, 195 insertions(+), 43 deletions(-) create mode 100644 arrow/benches/string_dictionary_builder.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 944dda9ebcc..6b431f07181 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,6 +38,7 @@ path = "src/lib.rs" bench = false [dependencies] +ahash = { version = "0.7", default-features = false } serde = { version = "1.0", default-features = false } serde_derive = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["preserve_order"] } @@ -45,6 +46,7 @@ indexmap = { version = "1.9", default-features = false, features = ["std"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } half = { version = "2.0", default-features = false } +hashbrown = { version = "0.12", default-features = false } csv_crate = { version = "1.1", default-features = false, optional = true, package="csv" } regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] } lazy_static = { version = "1.4", default-features = false } diff --git a/arrow/benches/string_dictionary_builder.rs b/arrow/benches/string_dictionary_builder.rs new file mode 100644 index 00000000000..bc014bec155 --- /dev/null +++ b/arrow/benches/string_dictionary_builder.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int32Builder, StringBuilder, StringDictionaryBuilder}; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::{thread_rng, Rng}; + +/// Note: this is best effort, not all keys are necessarily present or unique +fn build_strings(dict_size: usize, total_size: usize, key_len: usize) -> Vec { + let mut rng = thread_rng(); + let values: Vec = (0..dict_size) + .map(|_| (0..key_len).map(|_| rng.gen::()).collect()) + .collect(); + + (0..total_size) + .map(|_| values[rng.gen_range(0..dict_size)].clone()) + .collect() +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("string_dictionary_builder"); + + let mut do_bench = |dict_size: usize, total_size: usize, key_len: usize| { + group.bench_function( + format!( + "(dict_size:{}, len:{}, key_len: {})", + dict_size, total_size, key_len + ), + |b| { + let strings = build_strings(dict_size, total_size, key_len); + b.iter(|| { + let keys = Int32Builder::new(strings.len()); + let values = StringBuilder::new((key_len + 1) * dict_size); + let mut builder = StringDictionaryBuilder::new(keys, values); + + for val in &strings { + builder.append(val).unwrap(); + } + + builder.finish(); + }) + }, + ); + }; + + do_bench(20, 1000, 5); + do_bench(100, 1000, 5); + do_bench(100, 1000, 10); + do_bench(100, 10000, 10); + do_bench(100, 10000, 100); + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/arrow/src/array/builder/generic_list_builder.rs b/arrow/src/array/builder/generic_list_builder.rs index 1449b5c09cc..d1333b7bf70 100644 --- a/arrow/src/array/builder/generic_list_builder.rs +++ b/arrow/src/array/builder/generic_list_builder.rs @@ -107,6 +107,11 @@ where &mut self.values_builder } + /// Returns the child array builder as an immutable reference + pub fn values_ref(&self) -> &T { + &self.values_builder + } + /// Finish the current variable-length list array slot #[inline] pub fn append(&mut self, is_valid: bool) -> Result<()> { @@ -152,6 +157,11 @@ where GenericListArray::::from(array_data) } + + /// Returns the current offsets buffer as a slice + pub fn offsets_slice(&self) -> &[OffsetSize] { + self.offsets_builder.as_slice() + } } #[cfg(test)] diff --git a/arrow/src/array/builder/generic_string_builder.rs b/arrow/src/array/builder/generic_string_builder.rs index ee391c4d4f8..c09ffd66e84 100644 --- a/arrow/src/array/builder/generic_string_builder.rs +++ b/arrow/src/array/builder/generic_string_builder.rs @@ -87,6 +87,16 @@ impl GenericStringBuilder { pub fn finish(&mut self) -> GenericStringArray { GenericStringArray::::from(self.builder.finish()) } + + /// Returns the current values buffer as a slice + pub fn values_slice(&self) -> &[u8] { + self.builder.values_ref().values_slice() + } + + /// Returns the current offsets buffer as a slice + pub fn offsets_slice(&self) -> &[OffsetSize] { + self.builder.offsets_slice() + } } impl ArrayBuilder for GenericStringBuilder { diff --git a/arrow/src/array/builder/primitive_builder.rs b/arrow/src/array/builder/primitive_builder.rs index 83c62509cfb..b18239b7547 100644 --- a/arrow/src/array/builder/primitive_builder.rs +++ b/arrow/src/array/builder/primitive_builder.rs @@ -230,6 +230,11 @@ impl PrimitiveBuilder { b.append_n(self.values_builder.len(), true); self.bitmap_builder = Some(b); } + + /// Returns the current values buffer as a slice + pub fn values_slice(&self) -> &[T::Native] { + self.values_builder.as_slice() + } } #[cfg(test)] diff --git a/arrow/src/array/builder/string_dictionary_builder.rs b/arrow/src/array/builder/string_dictionary_builder.rs index d1b872fd313..918bf975692 100644 --- a/arrow/src/array/builder/string_dictionary_builder.rs +++ b/arrow/src/array/builder/string_dictionary_builder.rs @@ -15,21 +15,17 @@ // specific language governing permissions and limitations // under the License. +use super::PrimitiveBuilder; +use crate::array::{ + Array, ArrayBuilder, ArrayRef, DictionaryArray, StringArray, StringBuilder, +}; +use crate::datatypes::{ArrowDictionaryKeyType, ArrowNativeType}; +use crate::error::{ArrowError, Result}; +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; -use crate::array::array::Array; -use crate::array::ArrayBuilder; -use crate::array::ArrayRef; -use crate::array::ArrowDictionaryKeyType; -use crate::array::DictionaryArray; -use crate::array::PrimitiveBuilder; -use crate::array::StringArray; -use crate::array::StringBuilder; -use crate::datatypes::ArrowNativeType; -use crate::error::{ArrowError, Result}; - /// Array builder for `DictionaryArray` that stores Strings. For example to map a set of byte indices /// to String values. Note that the use of a `HashMap` here will not scale to very large /// arrays or result in an ordered dictionary. @@ -76,9 +72,16 @@ pub struct StringDictionaryBuilder where K: ArrowDictionaryKeyType, { + state: ahash::RandomState, + /// Used to provide a lookup from string value to key type + /// + /// Note: K's hash implementation is not used, instead the raw entry + /// API is used to store keys w.r.t the hash of the strings themselves + /// + dedup: HashMap, + keys_builder: PrimitiveBuilder, values_builder: StringBuilder, - map: HashMap, K::Native>, } impl StringDictionaryBuilder @@ -88,9 +91,10 @@ where /// Creates a new `StringDictionaryBuilder` from a keys builder and a value builder. pub fn new(keys_builder: PrimitiveBuilder, values_builder: StringBuilder) -> Self { Self { + state: Default::default(), + dedup: HashMap::with_capacity_and_hasher(keys_builder.capacity(), ()), keys_builder, values_builder, - map: HashMap::new(), } } @@ -122,27 +126,44 @@ where keys_builder: PrimitiveBuilder, dictionary_values: &StringArray, ) -> Result { + let state = ahash::RandomState::default(); let dict_len = dictionary_values.len(); - let mut values_builder = - StringBuilder::with_capacity(dict_len, dictionary_values.value_data().len()); - let mut map: HashMap, K::Native> = HashMap::with_capacity(dict_len); - for i in 0..dict_len { - if dictionary_values.is_valid(i) { - let value = dictionary_values.value(i); - map.insert( - value.as_bytes().into(), - K::Native::from_usize(i) - .ok_or(ArrowError::DictionaryKeyOverflowError)?, - ); - values_builder.append_value(value)?; - } else { - values_builder.append_null()?; + + let mut dedup = HashMap::with_capacity_and_hasher(dict_len, ()); + + let values_len = dictionary_values.value_data().len(); + let mut values_builder = StringBuilder::with_capacity(dict_len, values_len); + + for (idx, maybe_value) in dictionary_values.iter().enumerate() { + match maybe_value { + Some(value) => { + let hash = compute_hash(&state, value.as_bytes()); + + let key = K::Native::from_usize(idx) + .ok_or(ArrowError::DictionaryKeyOverflowError)?; + + let entry = + dedup.raw_entry_mut().from_hash(hash, |key: &K::Native| { + value.as_bytes() == get_bytes(&values_builder, key) + }); + + if let RawEntryMut::Vacant(v) = entry { + v.insert_with_hasher(hash, key, (), |key| { + compute_hash(&state, get_bytes(&values_builder, key)) + }); + } + + values_builder.append_value(value)?; + } + None => values_builder.append_null()?, } } + Ok(Self { + state, + dedup, keys_builder, values_builder, - map, }) } } @@ -190,19 +211,35 @@ where /// if already present in the values array or a new index if the /// value is appended to the values array. pub fn append(&mut self, value: impl AsRef) -> Result { - if let Some(&key) = self.map.get(value.as_ref().as_bytes()) { - // Append existing value. - self.keys_builder.append_value(key)?; - Ok(key) - } else { - // Append new value. - let key = K::Native::from_usize(self.values_builder.len()) - .ok_or(ArrowError::DictionaryKeyOverflowError)?; - self.values_builder.append_value(value.as_ref())?; - self.keys_builder.append_value(key as K::Native)?; - self.map.insert(value.as_ref().as_bytes().into(), key); - Ok(key) - } + let value = value.as_ref(); + + let state = &self.state; + let storage = &mut self.values_builder; + let hash = compute_hash(state, value.as_bytes()); + + let entry = self + .dedup + .raw_entry_mut() + .from_hash(hash, |key| value.as_bytes() == get_bytes(storage, key)); + + let key = match entry { + RawEntryMut::Occupied(entry) => *entry.into_key(), + RawEntryMut::Vacant(entry) => { + let index = storage.len(); + storage.append_value(value)?; + let key = K::Native::from_usize(index) + .ok_or(ArrowError::DictionaryKeyOverflowError)?; + + *entry + .insert_with_hasher(hash, key, (), |key| { + compute_hash(state, get_bytes(storage, key)) + }) + .0 + } + }; + self.keys_builder.append_value(key)?; + + Ok(key) } #[inline] @@ -212,12 +249,30 @@ where /// Builds the `DictionaryArray` and reset this builder. pub fn finish(&mut self) -> DictionaryArray { - self.map.clear(); + self.dedup.clear(); let value_ref: ArrayRef = Arc::new(self.values_builder.finish()); self.keys_builder.finish_dict(value_ref) } } +fn compute_hash(hasher: &ahash::RandomState, value: &[u8]) -> u64 { + use std::hash::{BuildHasher, Hash, Hasher}; + let mut state = hasher.build_hasher(); + value.hash(&mut state); + state.finish() +} + +fn get_bytes<'a, K: ArrowNativeType>(values: &'a StringBuilder, key: &K) -> &'a [u8] { + let offsets = values.offsets_slice(); + let values = values.values_slice(); + + let idx = key.to_usize().unwrap(); + let end_offset = offsets[idx + 1].to_usize().unwrap(); + let start_offset = offsets[idx].to_usize().unwrap(); + + &values[start_offset..end_offset] +} + #[cfg(test)] mod tests { use super::*;