Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add COW conversion for Buffer and PrimitiveArray and unary_mut #3115

Merged
merged 11 commits into from
Nov 17, 2022
12 changes: 12 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::cast::downcast_array;
use arrow_schema::Field;

#[test]
Expand Down Expand Up @@ -1113,4 +1114,15 @@ mod tests {
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}

#[test]
fn test_downcast_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let array: Int32Array = downcast_array(&boxed);

let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(array, expected);
}
}
143 changes: 142 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,61 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
}

/// Applies an unary and infallible function to a mutable primitive array.
/// Mutable primitive array means that the buffer is not shared with other arrays.
/// As a result, this mutates the buffer directly without allocating new buffer.
///
/// # Implementation
///
/// This will apply the function for all values, including those on null slots.
/// This implies that the operation must be infallible for any value of the corresponding type
/// or this function may panic.
/// # Example
/// ```rust
/// # use arrow_array::{Int32Array, types::Int32Type};
/// # fn main() {
/// let array = Int32Array::from(vec![Some(5), Some(7), None]);
/// let c = array.unary_mut(|x| x * 2 + 1).unwrap();
/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
/// # }
/// ```
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, PrimitiveArray<T>>
tustvold marked this conversation as resolved.
Show resolved Hide resolved
where
F: Fn(T::Native) -> T::Native,
{
let data = self.data();
let len = self.len();
let null_count = self.null_count();
let null_buffer = data.null_buffer().map(|b| b.bit_slice(data.offset(), len));

let buffer = self.data.buffers()[0].clone();
Copy link
Contributor

@tustvold tustvold Nov 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need call slice_with_length to apply the offset, something like

let element_len = std::mem::size_of::<T::Native>();
let buffer = self.data.buffers()[0].slice_with_length(data.offset() * element_len, len * element_len)

let buffer_len = buffer.len();

drop(self.data);

let mutable_buffer = buffer.into_mutable(buffer_len);

match mutable_buffer {
Ok(mut mutable_buffer) => {
mutable_buffer
.typed_data_mut()
.iter_mut()
.for_each(|l| *l = op(*l));
Ok(unsafe {
build_primitive_array(
len,
mutable_buffer.into(),
null_count,
null_buffer,
)
})
}
Err(buffer) => Err(unsafe {
build_primitive_array(len, buffer, null_count, null_buffer)
}),
}
}

/// Applies a unary and fallible function to all valid values in a primitive array
///
/// This is unlike [`Self::unary`] which will apply an infallible function to all rows
Expand Down Expand Up @@ -489,6 +544,42 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
)
}
}

/// Returns `PrimitiveBuilder` of this primitive array for mutating its values if the underlying
/// data buffer is not shared by others.
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
let null_buffer = self
.data
.null_buffer()
.cloned()
.and_then(|b| b.into_mutable(0).ok());

let len = self.len();
let null_bit_buffer = self.data.null_buffer().cloned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to bit_slice this to be consistent with the values buffer


let buffer = self.data.buffers()[0].clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, I think this needs to call slice_with_length

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to slice_with_length but I'm wondering what's difference than clone directly here?


drop(self.data);

let builder = buffer
.into_mutable(0)
.map(|buffer| PrimitiveBuilder::<T>::new_from_buffer(buffer, null_buffer));

match builder {
Ok(builder) => Ok(builder),
Err(buffer) => {
let builder = ArrayData::builder(T::DATA_TYPE)
.len(len)
.add_buffer(buffer)
.null_bit_buffer(null_bit_buffer);

let array_data = unsafe { builder.build_unchecked() };
let array = PrimitiveArray::<T>::from(array_data);

Err(array)
}
}
}
}

#[inline]
Expand Down Expand Up @@ -1036,7 +1127,9 @@ impl<T: DecimalType + ArrowPrimitiveType> PrimitiveArray<T> {
mod tests {
use super::*;
use crate::builder::{Decimal128Builder, Decimal256Builder};
use crate::BooleanArray;
use crate::cast::downcast_array;
use crate::{ArrayRef, BooleanArray};
use std::sync::Arc;

#[test]
fn test_primitive_array_from_vec() {
Expand Down Expand Up @@ -1939,4 +2032,52 @@ mod tests {

array.value(4);
}

#[test]
fn test_into_builder() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let col: Int32Array = downcast_array(&boxed);
drop(boxed);

let mut builder = col.into_builder().unwrap();

builder.append_value(4);
builder.append_null();
builder.append_value(2);

let expected: Int32Array = vec![Some(4), None, Some(2)].into_iter().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected into_builder to keep the current values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, currently the returned builder sets length to 0 so it will overwrite the existing values. Per the latest suggestion, it makes more sense to have the same length as the array. And we can ask for mutable slice if we want to mutate the existing values.


let new_array = builder.finish();
assert_eq!(expected, new_array);
}

#[test]
fn test_into_builder_cloned_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);

let col: Int32Array = PrimitiveArray::<Int32Type>::from(boxed.data().clone());
let err = col.into_builder();

match err {
Ok(_) => panic!("Should not get builder from cloned array"),
Err(returned) => {
let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(expected, returned)
}
}
}

#[test]
fn test_unary_mut() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let c = array.unary_mut(|x| x * 2 + 1).unwrap();
let expected: Int32Array = vec![3, 5, 7].into_iter().map(Some).collect();

assert_eq!(expected, c);
}
}
4 changes: 4 additions & 0 deletions arrow-array/src/builder/boolean_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this need to be provided with the length in bits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, missed it.

Self { buffer, len: 0 }
}

#[inline]
pub fn len(&self) -> usize {
self.len
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/builder/buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
}
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Self {
buffer,
len: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
len: 0,
len: buffer.len() / std::mem::size_of::<T::Native>(),

?

_marker: PhantomData,
}
}

/// Returns the current number of array elements in the internal buffer.
///
/// # Example:
Expand Down
12 changes: 11 additions & 1 deletion arrow-array/src/builder/null_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::builder::BooleanBufferBuilder;
use arrow_buffer::Buffer;
use arrow_buffer::{Buffer, MutableBuffer};

/// Builder for creating the null bit buffer.
/// This builder only materializes the buffer when we append `false`.
Expand All @@ -42,6 +42,16 @@ impl NullBufferBuilder {
}
}

/// Creates a new builder from a `MutableBuffer`.
pub fn new_from_buffer(buffer: MutableBuffer, capacity: usize) -> Self {
let bitmap_builder = Some(BooleanBufferBuilder::new_from_buffer(buffer));
Self {
bitmap_builder,
len: 0,
capacity,
}
}

/// Appends `n` `true`s into the builder
/// to indicate that these `n` items are not nulls.
#[inline]
Expand Down
17 changes: 17 additions & 0 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::MutableBuffer;
use arrow_data::ArrayData;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -114,6 +115,22 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
}
}

pub fn new_from_buffer(
values_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let capacity = values_buffer.capacity();

let null_buffer_builder = null_buffer
.map(|buffer| NullBufferBuilder::new_from_buffer(buffer, capacity))
.unwrap_or_else(|| NullBufferBuilder::new(capacity));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct, I think it needs to be created with a length of values_builder.len(), as opposed to a capacity based off values_buffer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, good catch!


Self {
values_builder: BufferBuilder::<T::Native>::new_from_buffer(values_buffer),
null_buffer_builder,
}
}

/// Returns the capacity of this builder measured in slots of type `T`
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
Expand Down
19 changes: 19 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,25 @@ impl Buffer {
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
pub fn into_mutable(self, len: usize) -> Result<MutableBuffer, Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this take a len? How does this differ from Buffer::length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #3115 (comment)

let offset_ptr = self.as_ptr();
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.and_then(|bytes| {
// The pointer of underlying buffer should not be offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat annoying limitation, I wonder if there is some way to avoid it 🤔

Perhaps we could push the offset into Bytes 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Wondering pushing the offset into Bytes can change it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally we'd do something like make MutableArray hold a Bytes, and then push an offset into Bytes. We could even extend the Allocation trait to allow reallocation of custom allocated data. Not sure how much, if any, of that you would be interested in doing 😅 was more just an observation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Just question: i think Arc::try_unwrap has check the exactly one strong reference.
And all sliced use the clone(), So this is impossible panic right? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone on an Arc increments the strong count, it does not perform a deep copy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold Sorry another question: I think in datafusion most PrimitiveArray data have only one reference during query. So base on this, most unary change using unary_mut like +1, will avoid memcpy than before using unary ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience

assert_eq!(offset_ptr, bytes.ptr().as_ptr());
MutableBuffer::from_bytes(bytes, len).map_err(Arc::new)
})
.map_err(|bytes| Buffer {
data: bytes,
offset,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down
18 changes: 18 additions & 0 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::mem;
use std::ptr::NonNull;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
Expand Down Expand Up @@ -92,6 +93,23 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes, len: usize) -> Result<Self, Bytes> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the provided len different from Bytes::len?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's due to different thoughts about the API. Previously I made the builder starting from length 0. But for unary_mut I want to make a slice with same length as the array before. So this is parameterized.

Now it is changed for your suggestion so this doesn't take a len parameter now.

if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
return Err(bytes);
}

let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);

Ok(Self {
data: ptr,
len,
capacity,
})
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
/// This is useful to create a buffer for packed bitmaps.
pub fn new_null(len: usize) -> Self {
Expand Down
5 changes: 5 additions & 0 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Bytes {
Deallocation::Custom(_) => 0,
}
}

#[inline]
pub(crate) fn deallocation(&self) -> &Deallocation {
&self.deallocation
}
}

// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
Expand Down