diff --git a/arrow-flight/src/sql/google.protobuf.rs b/arrow-flight/src/sql/google.protobuf.rs new file mode 100644 index 00000000000..e69de29bb2d diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 944dda9ebcc..6b431f07181 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,6 +38,7 @@ path = "src/lib.rs" bench = false [dependencies] +ahash = { version = "0.7", default-features = false } serde = { version = "1.0", default-features = false } serde_derive = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["preserve_order"] } @@ -45,6 +46,7 @@ indexmap = { version = "1.9", default-features = false, features = ["std"] } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } half = { version = "2.0", default-features = false } +hashbrown = { version = "0.12", default-features = false } csv_crate = { version = "1.1", default-features = false, optional = true, package="csv" } regex = { version = "1.5.6", default-features = false, features = ["std", "unicode"] } lazy_static = { version = "1.4", default-features = false } diff --git a/arrow/benches/string_dictionary_builder.rs b/arrow/benches/string_dictionary_builder.rs new file mode 100644 index 00000000000..bc014bec155 --- /dev/null +++ b/arrow/benches/string_dictionary_builder.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{Int32Builder, StringBuilder, StringDictionaryBuilder}; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::{thread_rng, Rng}; + +/// Note: this is best effort, not all keys are necessarily present or unique +fn build_strings(dict_size: usize, total_size: usize, key_len: usize) -> Vec { + let mut rng = thread_rng(); + let values: Vec = (0..dict_size) + .map(|_| (0..key_len).map(|_| rng.gen::()).collect()) + .collect(); + + (0..total_size) + .map(|_| values[rng.gen_range(0..dict_size)].clone()) + .collect() +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("string_dictionary_builder"); + + let mut do_bench = |dict_size: usize, total_size: usize, key_len: usize| { + group.bench_function( + format!( + "(dict_size:{}, len:{}, key_len: {})", + dict_size, total_size, key_len + ), + |b| { + let strings = build_strings(dict_size, total_size, key_len); + b.iter(|| { + let keys = Int32Builder::new(strings.len()); + let values = StringBuilder::new((key_len + 1) * dict_size); + let mut builder = StringDictionaryBuilder::new(keys, values); + + for val in &strings { + builder.append(val).unwrap(); + } + + builder.finish(); + }) + }, + ); + }; + + do_bench(20, 1000, 5); + do_bench(100, 1000, 5); + do_bench(100, 1000, 10); + do_bench(100, 10000, 10); + do_bench(100, 10000, 100); + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/arrow/src/array/builder/dictionary.rs b/arrow/src/array/builder/dictionary.rs new file mode 100644 index 00000000000..aca4dae2b66 --- /dev/null +++ b/arrow/src/array/builder/dictionary.rs @@ -0,0 +1,365 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::PrimitiveBuilder; +use crate::array::{ + Array, ArrayBuilder, ArrayRef, DictionaryArray, StringArray, StringBuilder, +}; +use crate::datatypes::{ArrowDictionaryKeyType, ArrowNativeType}; +use crate::error::{ArrowError, Result}; +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; +use std::any::Any; +use std::sync::Arc; + +/// Array builder for `DictionaryArray` that stores Strings. For example to map a set of byte indices +/// to String values. Note that the use of a `HashMap` here will not scale to very large +/// arrays or result in an ordered dictionary. +/// +/// ``` +/// use arrow::{ +/// array::{ +/// Int8Array, StringArray, +/// PrimitiveBuilder, StringBuilder, StringDictionaryBuilder, +/// }, +/// datatypes::Int8Type, +/// }; +/// +/// // Create a dictionary array indexed by bytes whose values are Strings. +/// // It can thus hold up to 256 distinct string values. +/// +/// let key_builder = PrimitiveBuilder::::new(100); +/// let value_builder = StringBuilder::new(100); +/// let mut builder = StringDictionaryBuilder::new(key_builder, value_builder); +/// +/// // The builder builds the dictionary value by value +/// builder.append("abc").unwrap(); +/// builder.append_null().unwrap(); +/// builder.append("def").unwrap(); +/// builder.append("def").unwrap(); +/// builder.append("abc").unwrap(); +/// let array = builder.finish(); +/// +/// assert_eq!( +/// array.keys(), +/// &Int8Array::from(vec![Some(0), None, Some(1), Some(1), Some(0)]) +/// ); +/// +/// // Values are polymorphic and so require a downcast. +/// let av = array.values(); +/// let ava: &StringArray = av.as_any().downcast_ref::().unwrap(); +/// +/// assert_eq!(ava.value(0), "abc"); +/// assert_eq!(ava.value(1), "def"); +/// +/// ``` +#[derive(Debug)] +pub struct StringDictionaryBuilder +where + K: ArrowDictionaryKeyType, +{ + state: ahash::RandomState, + /// Used to provide a lookup from string value to key type + /// + /// Note: K's hash implementation is not used, instead the raw entry + /// API is used to store keys w.r.t the hash of the strings themselves + /// + dedup: HashMap, + + keys_builder: PrimitiveBuilder, + values_builder: StringBuilder, +} + +impl StringDictionaryBuilder +where + K: ArrowDictionaryKeyType, +{ + /// Creates a new `StringDictionaryBuilder` from a keys builder and a value builder. + pub fn new(keys_builder: PrimitiveBuilder, values_builder: StringBuilder) -> Self { + Self { + state: Default::default(), + dedup: HashMap::with_capacity_and_hasher(keys_builder.capacity(), ()), + keys_builder, + values_builder, + } + } + + /// Creates a new `StringDictionaryBuilder` from a keys builder and a dictionary + /// which is initialized with the given values. + /// The indices of those dictionary values are used as keys. + /// + /// # Example + /// + /// ``` + /// use arrow::datatypes::Int16Type; + /// use arrow::array::{StringArray, StringDictionaryBuilder, PrimitiveBuilder, Int16Array}; + /// use std::convert::TryFrom; + /// + /// let dictionary_values = StringArray::from(vec![None, Some("abc"), Some("def")]); + /// + /// let mut builder = StringDictionaryBuilder::new_with_dictionary(PrimitiveBuilder::::new(3), &dictionary_values).unwrap(); + /// builder.append("def").unwrap(); + /// builder.append_null().unwrap(); + /// builder.append("abc").unwrap(); + /// + /// let dictionary_array = builder.finish(); + /// + /// let keys = dictionary_array.keys(); + /// + /// assert_eq!(keys, &Int16Array::from(vec![Some(2), None, Some(1)])); + /// ``` + pub fn new_with_dictionary( + keys_builder: PrimitiveBuilder, + dictionary_values: &StringArray, + ) -> Result { + let state = ahash::RandomState::default(); + let dict_len = dictionary_values.len(); + + let mut dedup = HashMap::with_capacity_and_hasher(dict_len, ()); + + let values_len = dictionary_values.value_data().len(); + let mut values_builder = StringBuilder::with_capacity(dict_len, values_len); + + for (idx, maybe_value) in dictionary_values.iter().enumerate() { + match maybe_value { + Some(value) => { + let hash = compute_hash(&state, value.as_bytes()); + + let key = K::Native::from_usize(idx) + .ok_or(ArrowError::DictionaryKeyOverflowError)?; + + let entry = + dedup.raw_entry_mut().from_hash(hash, |key: &K::Native| { + value.as_bytes() == get_bytes(&values_builder, key) + }); + + if let RawEntryMut::Vacant(v) = entry { + v.insert_with_hasher(hash, key, (), |key| { + compute_hash(&state, get_bytes(&values_builder, key)) + }); + } + + values_builder.append_value(value)?; + } + None => values_builder.append_null()?, + } + } + + Ok(Self { + state, + dedup, + keys_builder, + values_builder, + }) + } +} + +impl ArrayBuilder for StringDictionaryBuilder +where + K: ArrowDictionaryKeyType, +{ + /// Returns the builder as an non-mutable `Any` reference. + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns the builder as an mutable `Any` reference. + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + /// Returns the boxed builder as a box of `Any`. + fn into_box_any(self: Box) -> Box { + self + } + + /// Returns the number of array slots in the builder + fn len(&self) -> usize { + self.keys_builder.len() + } + + /// Returns whether the number of array slots is zero + fn is_empty(&self) -> bool { + self.keys_builder.is_empty() + } + + /// Builds the array and reset this builder. + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } +} + +impl StringDictionaryBuilder +where + K: ArrowDictionaryKeyType, +{ + /// Append a primitive value to the array. Return an existing index + /// if already present in the values array or a new index if the + /// value is appended to the values array. + pub fn append(&mut self, value: impl AsRef) -> Result { + let value = value.as_ref(); + + let state = &self.state; + let storage = &mut self.values_builder; + let hash = compute_hash(state, value.as_bytes()); + + let entry = self + .dedup + .raw_entry_mut() + .from_hash(hash, |key| value.as_bytes() == get_bytes(storage, key)); + + let key = match entry { + RawEntryMut::Occupied(entry) => *entry.into_key(), + RawEntryMut::Vacant(entry) => { + let index = storage.len(); + storage.append_value(value)?; + let key = K::Native::from_usize(index) + .ok_or(ArrowError::DictionaryKeyOverflowError)?; + + *entry + .insert_with_hasher(hash, key, (), |key| { + compute_hash(state, get_bytes(storage, key)) + }) + .0 + } + }; + self.keys_builder.append_value(key)?; + + Ok(key) + } + + #[inline] + pub fn append_null(&mut self) -> Result<()> { + self.keys_builder.append_null() + } + + /// Builds the `DictionaryArray` and reset this builder. + pub fn finish(&mut self) -> DictionaryArray { + self.dedup.clear(); + let value_ref: ArrayRef = Arc::new(self.values_builder.finish()); + self.keys_builder.finish_dict(value_ref) + } +} + +fn compute_hash(hasher: &ahash::RandomState, value: &[u8]) -> u64 { + use std::hash::{BuildHasher, Hash, Hasher}; + let mut state = hasher.build_hasher(); + value.hash(&mut state); + state.finish() +} + +fn get_bytes<'a, K: ArrowNativeType>(values: &'a StringBuilder, key: &K) -> &'a [u8] { + let offsets = values.builder.offsets_builder.as_slice(); + let values = values.builder.values_builder.values_builder.as_slice(); + + let idx = key.to_usize().unwrap(); + let end_offset = offsets[idx + 1].to_usize().unwrap(); + let start_offset = offsets[idx].to_usize().unwrap(); + + &values[start_offset..end_offset] +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::array::Int8Array; + use crate::datatypes::{Int16Type, Int8Type}; + + #[test] + fn test_string_dictionary_builder() { + let key_builder = PrimitiveBuilder::::new(5); + let value_builder = StringBuilder::new(2); + let mut builder = StringDictionaryBuilder::new(key_builder, value_builder); + builder.append("abc").unwrap(); + builder.append_null().unwrap(); + builder.append("def").unwrap(); + builder.append("def").unwrap(); + builder.append("abc").unwrap(); + let array = builder.finish(); + + assert_eq!( + array.keys(), + &Int8Array::from(vec![Some(0), None, Some(1), Some(1), Some(0)]) + ); + + // Values are polymorphic and so require a downcast. + let av = array.values(); + let ava: &StringArray = av.as_any().downcast_ref::().unwrap(); + + assert_eq!(ava.value(0), "abc"); + assert_eq!(ava.value(1), "def"); + } + + #[test] + fn test_string_dictionary_builder_with_existing_dictionary() { + let dictionary = StringArray::from(vec![None, Some("def"), Some("abc")]); + + let key_builder = PrimitiveBuilder::::new(6); + let mut builder = + StringDictionaryBuilder::new_with_dictionary(key_builder, &dictionary) + .unwrap(); + builder.append("abc").unwrap(); + builder.append_null().unwrap(); + builder.append("def").unwrap(); + builder.append("def").unwrap(); + builder.append("abc").unwrap(); + builder.append("ghi").unwrap(); + let array = builder.finish(); + + assert_eq!( + array.keys(), + &Int8Array::from(vec![Some(2), None, Some(1), Some(1), Some(2), Some(3)]) + ); + + // Values are polymorphic and so require a downcast. + let av = array.values(); + let ava: &StringArray = av.as_any().downcast_ref::().unwrap(); + + assert!(!ava.is_valid(0)); + assert_eq!(ava.value(1), "def"); + assert_eq!(ava.value(2), "abc"); + assert_eq!(ava.value(3), "ghi"); + } + + #[test] + fn test_string_dictionary_builder_with_reserved_null_value() { + let dictionary: Vec> = vec![None]; + let dictionary = StringArray::from(dictionary); + + let key_builder = PrimitiveBuilder::::new(4); + let mut builder = + StringDictionaryBuilder::new_with_dictionary(key_builder, &dictionary) + .unwrap(); + builder.append("abc").unwrap(); + builder.append_null().unwrap(); + builder.append("def").unwrap(); + builder.append("abc").unwrap(); + let array = builder.finish(); + + assert!(array.is_null(1)); + assert!(!array.is_valid(1)); + + let keys = array.keys(); + + assert_eq!(keys.value(0), 1); + assert!(keys.is_null(1)); + // zero initialization is currently guaranteed by Buffer allocation and resizing + assert_eq!(keys.value(1), 0); + assert_eq!(keys.value(2), 2); + assert_eq!(keys.value(3), 1); + } +} diff --git a/arrow/src/array/builder/generic_list_builder.rs b/arrow/src/array/builder/generic_list_builder.rs index 1449b5c09cc..d1333b7bf70 100644 --- a/arrow/src/array/builder/generic_list_builder.rs +++ b/arrow/src/array/builder/generic_list_builder.rs @@ -107,6 +107,11 @@ where &mut self.values_builder } + /// Returns the child array builder as an immutable reference + pub fn values_ref(&self) -> &T { + &self.values_builder + } + /// Finish the current variable-length list array slot #[inline] pub fn append(&mut self, is_valid: bool) -> Result<()> { @@ -152,6 +157,11 @@ where GenericListArray::::from(array_data) } + + /// Returns the current offsets buffer as a slice + pub fn offsets_slice(&self) -> &[OffsetSize] { + self.offsets_builder.as_slice() + } } #[cfg(test)] diff --git a/arrow/src/array/builder/generic_string_builder.rs b/arrow/src/array/builder/generic_string_builder.rs index ee391c4d4f8..c09ffd66e84 100644 --- a/arrow/src/array/builder/generic_string_builder.rs +++ b/arrow/src/array/builder/generic_string_builder.rs @@ -87,6 +87,16 @@ impl GenericStringBuilder { pub fn finish(&mut self) -> GenericStringArray { GenericStringArray::::from(self.builder.finish()) } + + /// Returns the current values buffer as a slice + pub fn values_slice(&self) -> &[u8] { + self.builder.values_ref().values_slice() + } + + /// Returns the current offsets buffer as a slice + pub fn offsets_slice(&self) -> &[OffsetSize] { + self.builder.offsets_slice() + } } impl ArrayBuilder for GenericStringBuilder { diff --git a/arrow/src/array/builder/primitive_builder.rs b/arrow/src/array/builder/primitive_builder.rs index 83c62509cfb..b18239b7547 100644 --- a/arrow/src/array/builder/primitive_builder.rs +++ b/arrow/src/array/builder/primitive_builder.rs @@ -230,6 +230,11 @@ impl PrimitiveBuilder { b.append_n(self.values_builder.len(), true); self.bitmap_builder = Some(b); } + + /// Returns the current values buffer as a slice + pub fn values_slice(&self) -> &[T::Native] { + self.values_builder.as_slice() + } } #[cfg(test)] diff --git a/arrow/src/array/builder/string_dictionary_builder.rs b/arrow/src/array/builder/string_dictionary_builder.rs index d1b872fd313..918bf975692 100644 --- a/arrow/src/array/builder/string_dictionary_builder.rs +++ b/arrow/src/array/builder/string_dictionary_builder.rs @@ -15,21 +15,17 @@ // specific language governing permissions and limitations // under the License. +use super::PrimitiveBuilder; +use crate::array::{ + Array, ArrayBuilder, ArrayRef, DictionaryArray, StringArray, StringBuilder, +}; +use crate::datatypes::{ArrowDictionaryKeyType, ArrowNativeType}; +use crate::error::{ArrowError, Result}; +use hashbrown::hash_map::RawEntryMut; +use hashbrown::HashMap; use std::any::Any; -use std::collections::HashMap; use std::sync::Arc; -use crate::array::array::Array; -use crate::array::ArrayBuilder; -use crate::array::ArrayRef; -use crate::array::ArrowDictionaryKeyType; -use crate::array::DictionaryArray; -use crate::array::PrimitiveBuilder; -use crate::array::StringArray; -use crate::array::StringBuilder; -use crate::datatypes::ArrowNativeType; -use crate::error::{ArrowError, Result}; - /// Array builder for `DictionaryArray` that stores Strings. For example to map a set of byte indices /// to String values. Note that the use of a `HashMap` here will not scale to very large /// arrays or result in an ordered dictionary. @@ -76,9 +72,16 @@ pub struct StringDictionaryBuilder where K: ArrowDictionaryKeyType, { + state: ahash::RandomState, + /// Used to provide a lookup from string value to key type + /// + /// Note: K's hash implementation is not used, instead the raw entry + /// API is used to store keys w.r.t the hash of the strings themselves + /// + dedup: HashMap, + keys_builder: PrimitiveBuilder, values_builder: StringBuilder, - map: HashMap, K::Native>, } impl StringDictionaryBuilder @@ -88,9 +91,10 @@ where /// Creates a new `StringDictionaryBuilder` from a keys builder and a value builder. pub fn new(keys_builder: PrimitiveBuilder, values_builder: StringBuilder) -> Self { Self { + state: Default::default(), + dedup: HashMap::with_capacity_and_hasher(keys_builder.capacity(), ()), keys_builder, values_builder, - map: HashMap::new(), } } @@ -122,27 +126,44 @@ where keys_builder: PrimitiveBuilder, dictionary_values: &StringArray, ) -> Result { + let state = ahash::RandomState::default(); let dict_len = dictionary_values.len(); - let mut values_builder = - StringBuilder::with_capacity(dict_len, dictionary_values.value_data().len()); - let mut map: HashMap, K::Native> = HashMap::with_capacity(dict_len); - for i in 0..dict_len { - if dictionary_values.is_valid(i) { - let value = dictionary_values.value(i); - map.insert( - value.as_bytes().into(), - K::Native::from_usize(i) - .ok_or(ArrowError::DictionaryKeyOverflowError)?, - ); - values_builder.append_value(value)?; - } else { - values_builder.append_null()?; + + let mut dedup = HashMap::with_capacity_and_hasher(dict_len, ()); + + let values_len = dictionary_values.value_data().len(); + let mut values_builder = StringBuilder::with_capacity(dict_len, values_len); + + for (idx, maybe_value) in dictionary_values.iter().enumerate() { + match maybe_value { + Some(value) => { + let hash = compute_hash(&state, value.as_bytes()); + + let key = K::Native::from_usize(idx) + .ok_or(ArrowError::DictionaryKeyOverflowError)?; + + let entry = + dedup.raw_entry_mut().from_hash(hash, |key: &K::Native| { + value.as_bytes() == get_bytes(&values_builder, key) + }); + + if let RawEntryMut::Vacant(v) = entry { + v.insert_with_hasher(hash, key, (), |key| { + compute_hash(&state, get_bytes(&values_builder, key)) + }); + } + + values_builder.append_value(value)?; + } + None => values_builder.append_null()?, } } + Ok(Self { + state, + dedup, keys_builder, values_builder, - map, }) } } @@ -190,19 +211,35 @@ where /// if already present in the values array or a new index if the /// value is appended to the values array. pub fn append(&mut self, value: impl AsRef) -> Result { - if let Some(&key) = self.map.get(value.as_ref().as_bytes()) { - // Append existing value. - self.keys_builder.append_value(key)?; - Ok(key) - } else { - // Append new value. - let key = K::Native::from_usize(self.values_builder.len()) - .ok_or(ArrowError::DictionaryKeyOverflowError)?; - self.values_builder.append_value(value.as_ref())?; - self.keys_builder.append_value(key as K::Native)?; - self.map.insert(value.as_ref().as_bytes().into(), key); - Ok(key) - } + let value = value.as_ref(); + + let state = &self.state; + let storage = &mut self.values_builder; + let hash = compute_hash(state, value.as_bytes()); + + let entry = self + .dedup + .raw_entry_mut() + .from_hash(hash, |key| value.as_bytes() == get_bytes(storage, key)); + + let key = match entry { + RawEntryMut::Occupied(entry) => *entry.into_key(), + RawEntryMut::Vacant(entry) => { + let index = storage.len(); + storage.append_value(value)?; + let key = K::Native::from_usize(index) + .ok_or(ArrowError::DictionaryKeyOverflowError)?; + + *entry + .insert_with_hasher(hash, key, (), |key| { + compute_hash(state, get_bytes(storage, key)) + }) + .0 + } + }; + self.keys_builder.append_value(key)?; + + Ok(key) } #[inline] @@ -212,12 +249,30 @@ where /// Builds the `DictionaryArray` and reset this builder. pub fn finish(&mut self) -> DictionaryArray { - self.map.clear(); + self.dedup.clear(); let value_ref: ArrayRef = Arc::new(self.values_builder.finish()); self.keys_builder.finish_dict(value_ref) } } +fn compute_hash(hasher: &ahash::RandomState, value: &[u8]) -> u64 { + use std::hash::{BuildHasher, Hash, Hasher}; + let mut state = hasher.build_hasher(); + value.hash(&mut state); + state.finish() +} + +fn get_bytes<'a, K: ArrowNativeType>(values: &'a StringBuilder, key: &K) -> &'a [u8] { + let offsets = values.offsets_slice(); + let values = values.values_slice(); + + let idx = key.to_usize().unwrap(); + let end_offset = offsets[idx + 1].to_usize().unwrap(); + let start_offset = offsets[idx].to_usize().unwrap(); + + &values[start_offset..end_offset] +} + #[cfg(test)] mod tests { use super::*;