Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add COW conversion for Buffer and PrimitiveArray and unary_mut #3115

Merged
merged 11 commits into from
Nov 17, 2022
12 changes: 12 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::cast::downcast_array;
use arrow_schema::Field;

#[test]
Expand Down Expand Up @@ -1113,4 +1114,15 @@ mod tests {
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}

#[test]
fn test_downcast_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let array: Int32Array = downcast_array(&boxed);

let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(array, expected);
}
}
161 changes: 160 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,36 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
}

/// Applies an unary and infallible function to a mutable primitive array.
/// Mutable primitive array means that the buffer is not shared with other arrays.
/// As a result, this mutates the buffer directly without allocating new buffer.
///
/// # Implementation
///
/// This will apply the function for all values, including those on null slots.
/// This implies that the operation must be infallible for any value of the corresponding type
/// or this function may panic.
/// # Example
/// ```rust
/// # use arrow_array::{Int32Array, types::Int32Type};
/// # fn main() {
/// let array = Int32Array::from(vec![Some(5), Some(7), None]);
/// let c = array.unary_mut(|x| x * 2 + 1).unwrap();
/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
/// # }
/// ```
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, PrimitiveArray<T>>
tustvold marked this conversation as resolved.
Show resolved Hide resolved
where
F: Fn(T::Native) -> T::Native,
{
let mut builder = self.into_builder()?;
builder
tustvold marked this conversation as resolved.
Show resolved Hide resolved
.values_slice_mut()
.iter_mut()
.for_each(|v| *v = op(*v));
Ok(builder.finish())
}

/// Applies a unary and fallible function to all valid values in a primitive array
///
/// This is unlike [`Self::unary`] which will apply an infallible function to all rows
Expand Down Expand Up @@ -489,6 +519,66 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
)
}
}

/// Returns `PrimitiveBuilder` of this primitive array for mutating its values if the underlying
/// data buffer is not shared by others.
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
let len = self.len();
let null_bit_buffer = self
.data
.null_buffer()
.map(|b| b.bit_slice(self.data.offset(), len));

let element_len = std::mem::size_of::<T::Native>();
let buffer = self.data.buffers()[0]
.slice_with_length(self.data.offset() * element_len, len * element_len);

drop(self.data);

let try_mutable_null_buffer = match null_bit_buffer {
None => Ok(None),
Some(null_buffer) => {
// Null buffer exists, tries to make it mutable
null_buffer.into_mutable().map(Some)
}
};

let try_mutable_buffers = match try_mutable_null_buffer {
Ok(mutable_null_buffer) => {
// Got mutable null buffer, tries to get mutable value buffer
let try_mutable_buffer = buffer.into_mutable();

// try_mutable_buffer.map(...).map_err(...) doesn't work as the compiler complains
// mutable_null_buffer is moved into map closure.
match try_mutable_buffer {
Ok(mutable_buffer) => Ok(PrimitiveBuilder::<T>::new_from_buffer(
mutable_buffer,
mutable_null_buffer,
)),
Err(buffer) => Err((buffer, mutable_null_buffer.map(|b| b.into()))),
}
}
Err(mutable_null_buffer) => {
// Unable to get mutable null buffer
Err((buffer, Some(mutable_null_buffer)))
}
};

match try_mutable_buffers {
Ok(builder) => Ok(builder),
Err((buffer, null_bit_buffer)) => {
let builder = ArrayData::builder(T::DATA_TYPE)
.len(len)
.add_buffer(buffer)
.null_bit_buffer(null_bit_buffer);

let array_data = unsafe { builder.build_unchecked() };
let array = PrimitiveArray::<T>::from(array_data);

Err(array)
}
}
}
}

#[inline]
Expand Down Expand Up @@ -1036,7 +1126,9 @@ impl<T: DecimalType + ArrowPrimitiveType> PrimitiveArray<T> {
mod tests {
use super::*;
use crate::builder::{Decimal128Builder, Decimal256Builder};
use crate::BooleanArray;
use crate::cast::downcast_array;
use crate::{ArrayRef, BooleanArray};
use std::sync::Arc;

#[test]
fn test_primitive_array_from_vec() {
Expand Down Expand Up @@ -1939,4 +2031,71 @@ mod tests {

array.value(4);
}

#[test]
fn test_into_builder() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let col: Int32Array = downcast_array(&boxed);
drop(boxed);

let mut builder = col.into_builder().unwrap();

let slice = builder.values_slice_mut();
assert_eq!(slice, &[1, 2, 3]);

slice[0] = 4;
slice[1] = 2;
slice[2] = 1;

let expected: Int32Array = vec![Some(4), Some(2), Some(1)].into_iter().collect();

let new_array = builder.finish();
assert_eq!(expected, new_array);
}

#[test]
fn test_into_builder_cloned_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);

let col: Int32Array = PrimitiveArray::<Int32Type>::from(boxed.data().clone());
let err = col.into_builder();

match err {
Ok(_) => panic!("Should not get builder from cloned array"),
Err(returned) => {
let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(expected, returned)
}
}
}

#[test]
fn test_into_builder_on_sliced_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
let slice = array.slice(1, 2);
let col: Int32Array = downcast_array(&slice);

drop(slice);

col.into_builder()
.expect_err("Should not build builder from sliced array");
}

#[test]
fn test_unary_mut() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let c = array.unary_mut(|x| x * 2 + 1).unwrap();
let expected: Int32Array = vec![3, 5, 7].into_iter().map(Some).collect();

assert_eq!(expected, c);

let array: Int32Array = Int32Array::from(vec![Some(5), Some(7), None]);
let c = array.unary_mut(|x| x * 2 + 1).unwrap();
assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
}
}
5 changes: 5 additions & 0 deletions arrow-array/src/builder/boolean_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
assert!(len <= buffer.len() * 8);
Self { buffer, len }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to assert that len <= buffer.len() * 8

}

#[inline]
pub fn len(&self) -> usize {
self.len
Expand Down
9 changes: 9 additions & 0 deletions arrow-array/src/builder/buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
}
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
let buffer_len = buffer.len();
Self {
buffer,
len: buffer_len / std::mem::size_of::<T>(),
_marker: PhantomData,
}
}

/// Returns the current number of array elements in the internal buffer.
///
/// # Example:
Expand Down
25 changes: 24 additions & 1 deletion arrow-array/src/builder/null_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::builder::BooleanBufferBuilder;
use arrow_buffer::Buffer;
use arrow_buffer::{Buffer, MutableBuffer};

/// Builder for creating the null bit buffer.
/// This builder only materializes the buffer when we append `false`.
Expand All @@ -42,6 +42,29 @@ impl NullBufferBuilder {
}
}

/// Creates a new builder with given length.
pub fn new_with_len(len: usize) -> Self {
Self {
bitmap_builder: None,
len,
capacity: len,
}
}

/// Creates a new builder from a `MutableBuffer`.
pub fn new_from_buffer(buffer: MutableBuffer, len: usize) -> Self {
let capacity = buffer.len() * 8;

assert!(len < capacity);

let bitmap_builder = Some(BooleanBufferBuilder::new_from_buffer(buffer, len));
Self {
bitmap_builder,
len,
capacity,
}
}

/// Appends `n` `true`s into the builder
/// to indicate that these `n` items are not nulls.
#[inline]
Expand Down
24 changes: 24 additions & 0 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::MutableBuffer;
use arrow_data::ArrayData;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -114,6 +115,24 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
}
}

pub fn new_from_buffer(
values_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let values_builder = BufferBuilder::<T::Native>::new_from_buffer(values_buffer);

let null_buffer_builder = null_buffer
.map(|buffer| {
NullBufferBuilder::new_from_buffer(buffer, values_builder.len())
})
.unwrap_or_else(|| NullBufferBuilder::new_with_len(values_builder.len()));

Self {
values_builder,
null_buffer_builder,
}
}

/// Returns the capacity of this builder measured in slots of type `T`
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
Expand Down Expand Up @@ -204,6 +223,11 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
pub fn values_slice(&self) -> &[T::Native] {
self.values_builder.as_slice()
}

/// Returns the current values buffer as a mutable slice
pub fn values_slice_mut(&mut self) -> &mut [T::Native] {
self.values_builder.as_slice_mut()
}
}

#[cfg(test)]
Expand Down
19 changes: 19 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,25 @@ impl Buffer {
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let offset_ptr = self.as_ptr();
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.and_then(|bytes| {
// The pointer of underlying buffer should not be offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat annoying limitation, I wonder if there is some way to avoid it 🤔

Perhaps we could push the offset into Bytes 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Wondering pushing the offset into Bytes can change it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally we'd do something like make MutableArray hold a Bytes, and then push an offset into Bytes. We could even extend the Allocation trait to allow reallocation of custom allocated data. Not sure how much, if any, of that you would be interested in doing 😅 was more just an observation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Just question: i think Arc::try_unwrap has check the exactly one strong reference.
And all sliced use the clone(), So this is impossible panic right? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone on an Arc increments the strong count, it does not perform a deep copy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold Sorry another question: I think in datafusion most PrimitiveArray data have only one reference during query. So base on this, most unary change using unary_mut like +1, will avoid memcpy than before using unary ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience

assert_eq!(offset_ptr, bytes.ptr().as_ptr());
MutableBuffer::from_bytes(bytes).map_err(Arc::new)
})
.map_err(|bytes| Buffer {
data: bytes,
offset,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down
19 changes: 19 additions & 0 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::mem;
use std::ptr::NonNull;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
Expand Down Expand Up @@ -92,6 +93,24 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes) -> Result<Self, Bytes> {
if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
return Err(bytes);
}

let len = bytes.len();
let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);

Ok(Self {
data: ptr,
len,
capacity,
})
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
/// This is useful to create a buffer for packed bitmaps.
pub fn new_null(len: usize) -> Self {
Expand Down
5 changes: 5 additions & 0 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Bytes {
Deallocation::Custom(_) => 0,
}
}

#[inline]
pub(crate) fn deallocation(&self) -> &Deallocation {
&self.deallocation
}
}

// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
Expand Down