Skip to content

Commit

Permalink
Support MapType in FFI (#2042)
Browse files Browse the repository at this point in the history
* Support Map in FFI

* For review and add test for map_keys_sorted

* Set MAP_KEYS_SORTED in try_from
  • Loading branch information
viirya committed Jul 12, 2022
1 parent ca5fe7d commit 742a590
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 13 deletions.
18 changes: 17 additions & 1 deletion arrow-pyarrow-integration-testing/tests/test_sql.py
Expand Up @@ -86,11 +86,11 @@ def assert_pyarrow_leak():
]
),
pa.dictionary(pa.int8(), pa.string()),
pa.map_(pa.string(), pa.int32()),
]

_unsupported_pyarrow_types = [
pa.decimal256(76, 38),
pa.map_(pa.string(), pa.int32()),
pa.union(
[pa.field("a", pa.binary(10)), pa.field("b", pa.string())],
mode=pa.lib.UnionMode_DENSE,
Expand Down Expand Up @@ -229,6 +229,22 @@ def test_list_array():
del a
del b

def test_map_array():
"""
Python -> Rust -> Python
"""
data = [
[{'key': "a", 'value': 1}, {'key': "b", 'value': 2}],
[{'key': "c", 'value': 3}, {'key': "d", 'value': 4}]
]
a = pa.array(data, pa.map_(pa.string(), pa.int32()))
b = rust.round_trip_array(a)
b.validate(full=True)
assert a.to_pylist() == b.to_pylist()
assert a.type == b.type
del a
del b

def test_fixed_len_list_array():
"""
Python -> Rust -> Python
Expand Down
35 changes: 33 additions & 2 deletions arrow/src/datatypes/ffi.rs
Expand Up @@ -17,6 +17,7 @@

use std::convert::TryFrom;

use crate::datatypes::DataType::Map;
use crate::{
datatypes::{DataType, Field, Schema, TimeUnit},
error::{ArrowError, Result},
Expand Down Expand Up @@ -68,6 +69,11 @@ impl TryFrom<&FFI_ArrowSchema> for DataType {
let fields = c_schema.children().map(Field::try_from);
DataType::Struct(fields.collect::<Result<Vec<_>>>()?)
}
"+m" => {
let c_child = c_schema.child(0);
let map_keys_sorted = c_schema.map_keys_sorted();
DataType::Map(Box::new(Field::try_from(c_child)?), map_keys_sorted)
}
// Parametrized types, requiring string parse
other => {
match other.splitn(2, ':').collect::<Vec<&str>>().as_slice() {
Expand Down Expand Up @@ -201,7 +207,8 @@ impl TryFrom<&DataType> for FFI_ArrowSchema {
let children = match dtype {
DataType::List(child)
| DataType::LargeList(child)
| DataType::FixedSizeList(child, _) => {
| DataType::FixedSizeList(child, _)
| DataType::Map(child, _) => {
vec![FFI_ArrowSchema::try_from(child.as_ref())?]
}
DataType::Struct(fields) => fields
Expand All @@ -215,7 +222,13 @@ impl TryFrom<&DataType> for FFI_ArrowSchema {
} else {
None
};
FFI_ArrowSchema::try_new(&format, children, dictionary)

let flags = match dtype {
Map(_, true) => Flags::MAP_KEYS_SORTED,
_ => Flags::empty(),
};

FFI_ArrowSchema::try_new(&format, children, dictionary)?.with_flags(flags)
}
}

Expand Down Expand Up @@ -262,6 +275,7 @@ fn get_format_string(dtype: &DataType) -> Result<String> {
DataType::List(_) => Ok("+l".to_string()),
DataType::LargeList(_) => Ok("+L".to_string()),
DataType::Struct(_) => Ok("+s".to_string()),
DataType::Map(_, _) => Ok("+m".to_string()),
DataType::Dictionary(key_data_type, _) => get_format_string(key_data_type),
other => Err(ArrowError::CDataInterface(format!(
"The datatype \"{:?}\" is still not supported in Rust implementation",
Expand All @@ -279,6 +293,7 @@ impl TryFrom<&Field> for FFI_ArrowSchema {
} else {
Flags::empty()
};

FFI_ArrowSchema::try_from(field.data_type())?
.with_name(field.name())?
.with_flags(flags)
Expand Down Expand Up @@ -404,4 +419,20 @@ mod tests {
assert!(result.is_err());
Ok(())
}

#[test]
fn test_map_keys_sorted() -> Result<()> {
let keys = Field::new("keys", DataType::Int32, false);
let values = Field::new("values", DataType::UInt32, false);
let entry_struct = DataType::Struct(vec![keys, values]);

// Construct a map array from the above two
let map_data_type =
DataType::Map(Box::new(Field::new("entries", entry_struct, true)), true);

let arrow_schema = FFI_ArrowSchema::try_from(map_data_type)?;
assert!(arrow_schema.map_keys_sorted());

Ok(())
}
}
61 changes: 51 additions & 10 deletions arrow/src/ffi.rs
Expand Up @@ -284,6 +284,10 @@ impl FFI_ArrowSchema {
pub fn dictionary(&self) -> Option<&Self> {
unsafe { self.dictionary.as_ref() }
}

pub fn map_keys_sorted(&self) -> bool {
self.flags & 0b00000100 != 0
}
}

impl Drop for FFI_ArrowSchema {
Expand Down Expand Up @@ -348,11 +352,18 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
data_type, i
)))
},
// Variable-size list and map have one i32 buffer.
// Variable-sized binaries: have two buffers.
// "small": first buffer is i32, second is in bytes
(DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) => size_of::<i32>() * 8,
(DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => size_of::<u8>() * 8,
(DataType::Utf8, _) | (DataType::Binary, _) | (DataType::List(_), _)=> {
(DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) | (DataType::Map(_, _), 1) => size_of::<i32>() * 8,
(DataType::Utf8, 2) | (DataType::Binary, 2) => size_of::<u8>() * 8,
(DataType::List(_), _) | (DataType::Map(_, _), _) => {
return Err(ArrowError::CDataInterface(format!(
"The datatype \"{:?}\" expects 2 buffers, but requested {}. Please verify that the C data interface is correctly implemented.",
data_type, i
)))
}
(DataType::Utf8, _) | (DataType::Binary, _) => {
return Err(ArrowError::CDataInterface(format!(
"The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.",
data_type, i
Expand Down Expand Up @@ -674,13 +685,14 @@ pub trait ArrowArrayRef {
| (DataType::Binary, 1)
| (DataType::LargeBinary, 1)
| (DataType::List(_), 1)
| (DataType::LargeList(_), 1) => {
| (DataType::LargeList(_), 1)
| (DataType::Map(_, _), 1) => {
// the len of the offset buffer (buffer 1) equals length + 1
let bits = bit_width(data_type, i)?;
debug_assert_eq!(bits % 8, 0);
(self.array().length as usize + 1) * (bits / 8)
}
(DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => {
(DataType::Utf8, 2) | (DataType::Binary, 2) => {
// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1)?;
// first buffer is the null buffer => add(1)
Expand All @@ -692,9 +704,7 @@ pub trait ArrowArrayRef {
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i32>() - 1) }) as usize
}
(DataType::LargeUtf8, 2)
| (DataType::LargeBinary, 2)
| (DataType::LargeList(_), 2) => {
(DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => {
// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1)?;
// first buffer is the null buffer => add(1)
Expand Down Expand Up @@ -897,8 +907,9 @@ mod tests {
use crate::array::{
export_array_into_raw, make_array, Array, ArrayData, BooleanArray, DecimalArray,
DictionaryArray, DurationSecondArray, FixedSizeBinaryArray, FixedSizeListArray,
GenericBinaryArray, GenericListArray, GenericStringArray, Int32Array, NullArray,
OffsetSizeTrait, Time32MillisecondArray, TimestampMillisecondArray,
GenericBinaryArray, GenericListArray, GenericStringArray, Int32Array, MapArray,
NullArray, OffsetSizeTrait, Time32MillisecondArray, TimestampMillisecondArray,
UInt32Array,
};
use crate::compute::kernels;
use crate::datatypes::{Field, Int8Type};
Expand Down Expand Up @@ -1436,4 +1447,34 @@ mod tests {

Ok(())
}

#[test]
fn test_map_array() -> Result<()> {
let keys = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
let values_data = UInt32Array::from(vec![0u32, 10, 20, 30, 40, 50, 60, 70]);

// Construct a buffer for value offsets, for the nested array:
// [[a, b, c], [d, e, f], [g, h]]
let entry_offsets = [0, 3, 6, 8];

let map_array = MapArray::new_from_strings(
keys.clone().into_iter(),
&values_data,
&entry_offsets,
)
.unwrap();

// export it
let array = ArrowArray::try_from(map_array.data().clone())?;

// (simulate consumer) import it
let data = ArrayData::try_from(array)?;
let array = make_array(data);

// perform some operation
let array = array.as_any().downcast_ref::<MapArray>().unwrap();
assert_eq!(array, &map_array);

Ok(())
}
}

0 comments on commit 742a590

Please sign in to comment.