Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add COW conversion for Buffer and PrimitiveArray and unary_mut #3115

Merged
merged 11 commits into from
Nov 17, 2022
12 changes: 12 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::cast::downcast_array;
use arrow_schema::Field;

#[test]
Expand Down Expand Up @@ -1113,4 +1114,15 @@ mod tests {
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}

#[test]
fn test_downcast_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let array: Int32Array = downcast_array(&boxed);

let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(array, expected);
}
}
135 changes: 134 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,54 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
unsafe { build_primitive_array(len, buffer, null_count, null_buffer) }
}

/// Applies an unary and infallible function to a mutable primitive array.
/// Mutable primitive array means that the buffer is not shared with other arrays.
/// As a result, this mutates the buffer directly without allocating new buffer.
///
/// # Implementation
///
/// This will apply the function for all values, including those on null slots.
/// This implies that the operation must be infallible for any value of the corresponding type
/// or this function may panic.
/// # Example
/// ```rust
/// # use arrow_array::{Int32Array, types::Int32Type};
/// # fn main() {
/// let array = Int32Array::from(vec![Some(5), Some(7), None]);
/// let c = array.unary_mut(|x| x * 2 + 1).unwrap();
/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
/// # }
/// ```
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, ArrowError>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should return Self as an error to match the other similar methods?

where
F: Fn(T::Native) -> T::Native,
{
let data = self.data();
let len = self.len();
let null_count = self.null_count();
let null_buffer = data.null_buffer().map(|b| b.bit_slice(data.offset(), len));

let mut buffers = self.data.get_buffers();
let buffer = buffers.remove(0);
let buffer_len = buffer.len();

let mutable_buffer = buffer.into_mutable(buffer_len);

let buffer = match mutable_buffer {
Ok(mut mutable_buffer) => {
mutable_buffer
.typed_data_mut()
.iter_mut()
.for_each(|l| *l = op(*l));
Ok(mutable_buffer.into())
}
Err(_) => Err(ArrowError::InvalidArgumentError(
"Not a mutable array because its buffer is shared.".to_string(),
)),
}?;
Ok(unsafe { build_primitive_array(len, buffer, null_count, null_buffer) })
}

/// Applies a unary and fallible function to all valid values in a primitive array
///
/// This is unlike [`Self::unary`] which will apply an infallible function to all rows
Expand Down Expand Up @@ -489,6 +537,41 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
)
}
}

/// Returns `PrimitiveBuilder` of this primitive array for mutating its values if the underlying
/// data buffer is not shared by others.
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
let null_buffer = self
.data
.null_buffer()
.cloned()
.and_then(|b| b.into_mutable(0).ok());

let len = self.len();
let null_bit_buffer = self.data.null_buffer().cloned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to bit_slice this to be consistent with the values buffer


let mut buffers = self.data.get_buffers();
let buffer = buffers.remove(0);

let builder = buffer
.into_mutable(0)
.map(|buffer| PrimitiveBuilder::<T>::new_from_buffer(buffer, null_buffer));

match builder {
Ok(builder) => Ok(builder),
Err(buffer) => {
let builder = ArrayData::builder(T::DATA_TYPE)
.len(len)
.add_buffer(buffer)
.null_bit_buffer(null_bit_buffer);

let array_data = unsafe { builder.build_unchecked() };
let array = PrimitiveArray::<T>::from(array_data);

Err(array)
}
}
}
}

#[inline]
Expand Down Expand Up @@ -1036,7 +1119,9 @@ impl<T: DecimalType + ArrowPrimitiveType> PrimitiveArray<T> {
mod tests {
use super::*;
use crate::builder::{Decimal128Builder, Decimal256Builder};
use crate::BooleanArray;
use crate::cast::downcast_array;
use crate::{ArrayRef, BooleanArray};
use std::sync::Arc;

#[test]
fn test_primitive_array_from_vec() {
Expand Down Expand Up @@ -1939,4 +2024,52 @@ mod tests {

array.value(4);
}

#[test]
fn test_into_builder() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);
let col: Int32Array = downcast_array(&boxed);
drop(boxed);

let mut builder = col.into_builder().unwrap();

builder.append_value(4);
builder.append_null();
builder.append_value(2);

let expected: Int32Array = vec![Some(4), None, Some(2)].into_iter().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected into_builder to keep the current values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, currently the returned builder sets length to 0 so it will overwrite the existing values. Per the latest suggestion, it makes more sense to have the same length as the array. And we can ask for mutable slice if we want to mutate the existing values.


let new_array = builder.finish();
assert_eq!(expected, new_array);
}

#[test]
fn test_into_builder_cloned_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: ArrayRef = Arc::new(array);

let col: Int32Array = PrimitiveArray::<Int32Type>::from(boxed.data().clone());
let err = col.into_builder();

match err {
Ok(_) => panic!("Should not get builder from cloned array"),
Err(returned) => {
let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(expected, returned)
}
}
}

#[test]
fn test_unary_mut() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let c = array.unary_mut(|x| x * 2 + 1).unwrap();
let expected: Int32Array = vec![3, 5, 7].into_iter().map(Some).collect();

assert_eq!(expected, c);
}
}
4 changes: 4 additions & 0 deletions arrow-array/src/builder/boolean_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this need to be provided with the length in bits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, missed it.

Self { buffer, len: 0 }
}

#[inline]
pub fn len(&self) -> usize {
self.len
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/builder/buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
}
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Self {
buffer,
len: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
len: 0,
len: buffer.len() / std::mem::size_of::<T::Native>(),

?

_marker: PhantomData,
}
}

/// Returns the current number of array elements in the internal buffer.
///
/// # Example:
Expand Down
11 changes: 10 additions & 1 deletion arrow-array/src/builder/null_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::builder::BooleanBufferBuilder;
use arrow_buffer::Buffer;
use arrow_buffer::{Buffer, MutableBuffer};

/// Builder for creating the null bit buffer.
/// This builder only materializes the buffer when we append `false`.
Expand All @@ -42,6 +42,15 @@ impl NullBufferBuilder {
}
}

pub fn new_from_buffer(buffer: Option<MutableBuffer>, capacity: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the option here is necessary, as if there is no MutableBuffer, callers can just call the regular constructor?

let bitmap_builder = buffer.map(BooleanBufferBuilder::new_from_buffer);
Self {
bitmap_builder,
len: 0,
capacity,
}
}

/// Appends `n` `true`s into the builder
/// to indicate that these `n` items are not nulls.
#[inline]
Expand Down
16 changes: 16 additions & 0 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::MutableBuffer;
use arrow_data::ArrayData;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -114,6 +115,21 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
}
}

pub fn new_from_buffer(
values_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let capacity = values_buffer.capacity();

Self {
values_builder: BufferBuilder::<T::Native>::new_from_buffer(values_buffer),
null_buffer_builder: NullBufferBuilder::new_from_buffer(
null_buffer,
capacity,
),
}
}

/// Returns the capacity of this builder measured in slots of type `T`
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
Expand Down
24 changes: 23 additions & 1 deletion arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};
use std::{convert::AsRef, mem, usize};

use crate::alloc::{Allocation, Deallocation};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
Expand Down Expand Up @@ -227,6 +227,28 @@ impl Buffer {
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
pub fn into_mutable(self, len: usize) -> Result<MutableBuffer, Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this take a len? How does this differ from Buffer::length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #3115 (comment)

let offset_ptr = self.as_ptr();
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.map(|bytes| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify that bytes has deallocation of Deallocation::Arrow, otherwise this is not well formed

// The pointer of underlying buffer should not be offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat annoying limitation, I wonder if there is some way to avoid it 🤔

Perhaps we could push the offset into Bytes 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Wondering pushing the offset into Bytes can change it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally we'd do something like make MutableArray hold a Bytes, and then push an offset into Bytes. We could even extend the Allocation trait to allow reallocation of custom allocated data. Not sure how much, if any, of that you would be interested in doing 😅 was more just an observation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Just question: i think Arc::try_unwrap has check the exactly one strong reference.
And all sliced use the clone(), So this is impossible panic right? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone on an Arc increments the strong count, it does not perform a deep copy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold Sorry another question: I think in datafusion most PrimitiveArray data have only one reference during query. So base on this, most unary change using unary_mut like +1, will avoid memcpy than before using unary ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience

assert_eq!(offset_ptr, bytes.ptr().as_ptr());

let mutable_buffer =
MutableBuffer::from_ptr(bytes.ptr(), len, bytes.capacity());
mem::forget(bytes);
mutable_buffer
})
.map_err(|bytes| Buffer {
data: bytes,
offset,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down
9 changes: 9 additions & 0 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given pointer `ptr`, `capacity`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be "safer" for the to be from_bytes and contain the necessary logic to make that well-formed, i.e. check the deallocation is as expected, etc...

pub(crate) fn from_ptr(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
Self {
data: ptr,
len,
capacity,
}
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
/// This is useful to create a buffer for packed bitmaps.
pub fn new_null(len: usize) -> Self {
Expand Down
4 changes: 4 additions & 0 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ impl ArrayData {
&self.buffers[..]
}

pub fn get_buffers(self) -> Vec<Buffer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about child_data, null_buffers, etc... ?

I wonder if a more general pattern would be to use ArrayData::buffers, clone the desired buffers and then drop the ArrayData?

self.buffers
}

/// Returns a slice of children data arrays
pub fn child_data(&self) -> &[ArrayData] {
&self.child_data[..]
Expand Down