Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add COW conversion for Buffer and PrimitiveArray and unary_mut #3115

Merged
merged 11 commits into from
Nov 17, 2022
33 changes: 33 additions & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,28 @@ impl<'a, T: Array> Array for &'a T {
}
}

/// A kind of `Array` which can be downcasted by `downcast_array`.
pub(crate) trait SizedArray: AsDynAny + Array {}

/// A trait used to help conversion from `Arc<Self>` to `Arc<Any>`.
pub(crate) trait AsDynAny: Any {
fn as_dyn_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
}

impl<T: Sized + Send + Sync + 'static> AsDynAny for T {
fn as_dyn_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
self
}
}

/// Downcasts an Arc-ed sized `Array` to Arc-ed underlying arrow type.
#[allow(dead_code)]
pub(crate) fn downcast_array<T: Send + Sync + 'static>(
array: Arc<dyn SizedArray>,
Copy link
Contributor

@tustvold tustvold Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you go from an ArrayRef to Arc<dyn SizedArray>?

Edit: Is the intention to change ArrayRef to be Arc<dyn SizedArray>? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot make AsDynAny work with Arc<dyn Array>. We have implemented Array for reference type &'a T. Any must be 'static and and it must be Sized as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, got what you mean. Hmm, good question. I currently don't make it work with ArrayRef.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal in #3117

) -> Option<Arc<T>> {
AsDynAny::as_dyn_any(array).downcast().ok()
}

/// A generic trait for accessing the values of an [`Array`]
///
/// # Validity
Expand Down Expand Up @@ -1113,4 +1135,15 @@ mod tests {
assert!(compute_my_thing(&arr));
assert!(compute_my_thing(arr.as_ref()));
}

#[test]
fn test_downcast_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: Arc<dyn SizedArray> = Arc::new(array);
let array: Arc<Int32Array> = downcast_array(boxed).unwrap();

let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(&array, &Arc::new(expected));
}
}
80 changes: 79 additions & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::array::SizedArray;
use crate::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder};
use crate::iterator::PrimitiveIter;
use crate::raw_pointer::RawPtrBox;
Expand Down Expand Up @@ -489,6 +490,41 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
)
}
}

/// Returns `PrimitiveBuilder` of this primitive array for mutating its values if the underlying
/// data buffer is not shared by others.
pub fn into_builder(self) -> Result<PrimitiveBuilder<T>, Self> {
let null_buffer = self
.data
.null_buffer()
.cloned()
.and_then(|b| b.into_mutable().ok());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be split into the part that clones the null buffer and into_mutable after data is dropped.

I also think this method should fail if the null buffer cannot be converted, where I think it currently just loses the null buffer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's why the doc test is failed now. Looking into how to make it work.


let len = self.len();
let null_bit_buffer = self.data.null_buffer().cloned();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to bit_slice this to be consistent with the values buffer


let mut buffers = self.data.get_buffers();
let buffer = buffers.remove(0);

let builder = buffer
.into_mutable()
.map(|buffer| PrimitiveBuilder::<T>::new_from_buffer(buffer, null_buffer));

match builder {
Ok(builder) => Ok(builder),
Err(buffer) => {
let builder = ArrayData::builder(T::DATA_TYPE)
.len(len)
.add_buffer(buffer)
.null_bit_buffer(null_bit_buffer);

let array_data = unsafe { builder.build_unchecked() };
let array = PrimitiveArray::<T>::from(array_data);

Err(array)
}
}
}
}

#[inline]
Expand Down Expand Up @@ -529,6 +565,9 @@ impl<T: ArrowPrimitiveType> Array for PrimitiveArray<T> {
}
}

/// Makes `PrimitiveArray<T>` can be "downcast_array".
impl<T: ArrowPrimitiveType> SizedArray for PrimitiveArray<T> {}

impl<'a, T: ArrowPrimitiveType> ArrayAccessor for &'a PrimitiveArray<T> {
type Item = T::Native;

Expand Down Expand Up @@ -1036,7 +1075,8 @@ impl<T: DecimalType + ArrowPrimitiveType> PrimitiveArray<T> {
mod tests {
use super::*;
use crate::builder::{Decimal128Builder, Decimal256Builder};
use crate::BooleanArray;
use crate::{downcast_array, BooleanArray};
use std::sync::Arc;

#[test]
fn test_primitive_array_from_vec() {
Expand Down Expand Up @@ -1939,4 +1979,42 @@ mod tests {

array.value(4);
}

#[test]
fn test_into_builder() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: Arc<dyn SizedArray> = Arc::new(array);
let array: Arc<Int32Array> = downcast_array(boxed).unwrap();

let col: Int32Array = Arc::try_unwrap(array).unwrap();
let mut builder = col.into_builder().unwrap();

builder.append_value(4);
builder.append_null();
builder.append_value(2);

let expected: Int32Array = vec![Some(4), None, Some(2)].into_iter().collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have expected into_builder to keep the current values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, currently the returned builder sets length to 0 so it will overwrite the existing values. Per the latest suggestion, it makes more sense to have the same length as the array. And we can ask for mutable slice if we want to mutate the existing values.


let new_array = builder.finish();
assert_eq!(expected, new_array);
}

#[test]
fn test_into_builder_cloned_array() {
let array: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();

let boxed: Arc<dyn SizedArray> = Arc::new(array);

let col: Int32Array = PrimitiveArray::<Int32Type>::from(boxed.data().clone());
let err = col.into_builder();

match err {
Ok(_) => panic!("Should not get builder from cloned array"),
Err(returned) => {
let expected: Int32Array = vec![1, 2, 3].into_iter().map(Some).collect();
assert_eq!(expected, returned)
}
}
}
}
4 changes: 4 additions & 0 deletions arrow-array/src/builder/boolean_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ impl BooleanBufferBuilder {
Self { buffer, len: 0 }
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this need to be provided with the length in bits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, missed it.

Self { buffer, len: 0 }
}

#[inline]
pub fn len(&self) -> usize {
self.len
Expand Down
8 changes: 8 additions & 0 deletions arrow-array/src/builder/buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ impl<T: ArrowNativeType> BufferBuilder<T> {
}
}

pub fn new_from_buffer(buffer: MutableBuffer) -> Self {
Self {
buffer,
len: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
len: 0,
len: buffer.len() / std::mem::size_of::<T::Native>(),

?

_marker: PhantomData,
}
}

/// Returns the current number of array elements in the internal buffer.
///
/// # Example:
Expand Down
11 changes: 10 additions & 1 deletion arrow-array/src/builder/null_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::builder::BooleanBufferBuilder;
use arrow_buffer::Buffer;
use arrow_buffer::{Buffer, MutableBuffer};

/// Builder for creating the null bit buffer.
/// This builder only materializes the buffer when we append `false`.
Expand All @@ -42,6 +42,15 @@ impl NullBufferBuilder {
}
}

pub fn new_from_buffer(buffer: Option<MutableBuffer>, capacity: usize) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the option here is necessary, as if there is no MutableBuffer, callers can just call the regular constructor?

let bitmap_builder = buffer.map(BooleanBufferBuilder::new_from_buffer);
Self {
bitmap_builder,
len: 0,
capacity,
}
}

/// Appends `n` `true`s into the builder
/// to indicate that these `n` items are not nulls.
#[inline]
Expand Down
16 changes: 16 additions & 0 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::MutableBuffer;
use arrow_data::ArrayData;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -114,6 +115,21 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
}
}

pub fn new_from_buffer(
values_buffer: MutableBuffer,
null_buffer: Option<MutableBuffer>,
) -> Self {
let capacity = values_buffer.capacity();

Self {
values_builder: BufferBuilder::<T::Native>::new_from_buffer(values_buffer),
null_buffer_builder: NullBufferBuilder::new_from_buffer(
null_buffer,
capacity,
),
}
}

/// Returns the capacity of this builder measured in slots of type `T`
pub fn capacity(&self) -> usize {
self.values_builder.capacity()
Expand Down
24 changes: 23 additions & 1 deletion arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};
use std::{convert::AsRef, mem, usize};

use crate::alloc::{Allocation, Deallocation};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
Expand Down Expand Up @@ -227,6 +227,28 @@ impl Buffer {
pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let offset_ptr = self.as_ptr();
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.map(|bytes| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to verify that bytes has deallocation of Deallocation::Arrow, otherwise this is not well formed

// The pointer of underlying buffer should not be offset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a somewhat annoying limitation, I wonder if there is some way to avoid it 🤔

Perhaps we could push the offset into Bytes 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Wondering pushing the offset into Bytes can change it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally we'd do something like make MutableArray hold a Bytes, and then push an offset into Bytes. We could even extend the Allocation trait to allow reallocation of custom allocated data. Not sure how much, if any, of that you would be interested in doing 😅 was more just an observation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the limitation, I assume that a non-zero offset means the Bytes is shared/sliced from others. So it is disallowed to be mutable here.

Just question: i think Arc::try_unwrap has check the exactly one strong reference.
And all sliced use the clone(), So this is impossible panic right? 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone on an Arc increments the strong count, it does not perform a deep copy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold Sorry another question: I think in datafusion most PrimitiveArray data have only one reference during query. So base on this, most unary change using unary_mut like +1, will avoid memcpy than before using unary ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would likely need to update the kernels to make use of this, but theoretically. Whether this will make a major difference in practice I'm not sure, sorts, aggregates and joins, tend to dominate queries in my experience

assert_eq!(offset_ptr, bytes.ptr().as_ptr());

let mutable_buffer =
MutableBuffer::from_ptr(bytes.ptr(), bytes.capacity());
mem::forget(bytes);
mutable_buffer
})
.map_err(|bytes| Buffer {
data: bytes,
offset,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down
9 changes: 9 additions & 0 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given pointer `ptr`, `capacity`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be "safer" for the to be from_bytes and contain the necessary logic to make that well-formed, i.e. check the deallocation is as expected, etc...

pub(crate) fn from_ptr(ptr: NonNull<u8>, capacity: usize) -> Self {
Self {
data: ptr,
len: 0,
capacity,
}
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
/// This is useful to create a buffer for packed bitmaps.
pub fn new_null(len: usize) -> Self {
Expand Down
4 changes: 4 additions & 0 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ impl ArrayData {
&self.buffers[..]
}

pub fn get_buffers(self) -> Vec<Buffer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about child_data, null_buffers, etc... ?

I wonder if a more general pattern would be to use ArrayData::buffers, clone the desired buffers and then drop the ArrayData?

self.buffers
}

/// Returns a slice of children data arrays
pub fn child_data(&self) -> &[ArrayData] {
&self.child_data[..]
Expand Down