From 742a5908b119fadef51be596c5c8f07b37a01de5 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 12 Jul 2022 00:40:41 -0700 Subject: [PATCH] Support MapType in FFI (#2042) * Support Map in FFI * For review and add test for map_keys_sorted * Set MAP_KEYS_SORTED in try_from --- .../tests/test_sql.py | 18 +++++- arrow/src/datatypes/ffi.rs | 35 ++++++++++- arrow/src/ffi.rs | 61 ++++++++++++++++--- 3 files changed, 101 insertions(+), 13 deletions(-) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index a17ba6d0613..a19edf0ccd0 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -86,11 +86,11 @@ def assert_pyarrow_leak(): ] ), pa.dictionary(pa.int8(), pa.string()), + pa.map_(pa.string(), pa.int32()), ] _unsupported_pyarrow_types = [ pa.decimal256(76, 38), - pa.map_(pa.string(), pa.int32()), pa.union( [pa.field("a", pa.binary(10)), pa.field("b", pa.string())], mode=pa.lib.UnionMode_DENSE, @@ -229,6 +229,22 @@ def test_list_array(): del a del b +def test_map_array(): + """ + Python -> Rust -> Python + """ + data = [ + [{'key': "a", 'value': 1}, {'key': "b", 'value': 2}], + [{'key': "c", 'value': 3}, {'key': "d", 'value': 4}] + ] + a = pa.array(data, pa.map_(pa.string(), pa.int32())) + b = rust.round_trip_array(a) + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + del a + del b + def test_fixed_len_list_array(): """ Python -> Rust -> Python diff --git a/arrow/src/datatypes/ffi.rs b/arrow/src/datatypes/ffi.rs index 2f1b092a862..0857da61263 100644 --- a/arrow/src/datatypes/ffi.rs +++ b/arrow/src/datatypes/ffi.rs @@ -17,6 +17,7 @@ use std::convert::TryFrom; +use crate::datatypes::DataType::Map; use crate::{ datatypes::{DataType, Field, Schema, TimeUnit}, error::{ArrowError, Result}, @@ -68,6 +69,11 @@ impl TryFrom<&FFI_ArrowSchema> for DataType { let fields = c_schema.children().map(Field::try_from); DataType::Struct(fields.collect::>>()?) } + "+m" => { + let c_child = c_schema.child(0); + let map_keys_sorted = c_schema.map_keys_sorted(); + DataType::Map(Box::new(Field::try_from(c_child)?), map_keys_sorted) + } // Parametrized types, requiring string parse other => { match other.splitn(2, ':').collect::>().as_slice() { @@ -201,7 +207,8 @@ impl TryFrom<&DataType> for FFI_ArrowSchema { let children = match dtype { DataType::List(child) | DataType::LargeList(child) - | DataType::FixedSizeList(child, _) => { + | DataType::FixedSizeList(child, _) + | DataType::Map(child, _) => { vec![FFI_ArrowSchema::try_from(child.as_ref())?] } DataType::Struct(fields) => fields @@ -215,7 +222,13 @@ impl TryFrom<&DataType> for FFI_ArrowSchema { } else { None }; - FFI_ArrowSchema::try_new(&format, children, dictionary) + + let flags = match dtype { + Map(_, true) => Flags::MAP_KEYS_SORTED, + _ => Flags::empty(), + }; + + FFI_ArrowSchema::try_new(&format, children, dictionary)?.with_flags(flags) } } @@ -262,6 +275,7 @@ fn get_format_string(dtype: &DataType) -> Result { DataType::List(_) => Ok("+l".to_string()), DataType::LargeList(_) => Ok("+L".to_string()), DataType::Struct(_) => Ok("+s".to_string()), + DataType::Map(_, _) => Ok("+m".to_string()), DataType::Dictionary(key_data_type, _) => get_format_string(key_data_type), other => Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" is still not supported in Rust implementation", @@ -279,6 +293,7 @@ impl TryFrom<&Field> for FFI_ArrowSchema { } else { Flags::empty() }; + FFI_ArrowSchema::try_from(field.data_type())? .with_name(field.name())? .with_flags(flags) @@ -404,4 +419,20 @@ mod tests { assert!(result.is_err()); Ok(()) } + + #[test] + fn test_map_keys_sorted() -> Result<()> { + let keys = Field::new("keys", DataType::Int32, false); + let values = Field::new("values", DataType::UInt32, false); + let entry_struct = DataType::Struct(vec![keys, values]); + + // Construct a map array from the above two + let map_data_type = + DataType::Map(Box::new(Field::new("entries", entry_struct, true)), true); + + let arrow_schema = FFI_ArrowSchema::try_from(map_data_type)?; + assert!(arrow_schema.map_keys_sorted()); + + Ok(()) + } } diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index b55ca7eb5ce..1287f3b0a4d 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -284,6 +284,10 @@ impl FFI_ArrowSchema { pub fn dictionary(&self) -> Option<&Self> { unsafe { self.dictionary.as_ref() } } + + pub fn map_keys_sorted(&self) -> bool { + self.flags & 0b00000100 != 0 + } } impl Drop for FFI_ArrowSchema { @@ -348,11 +352,18 @@ fn bit_width(data_type: &DataType, i: usize) -> Result { data_type, i ))) }, + // Variable-size list and map have one i32 buffer. // Variable-sized binaries: have two buffers. // "small": first buffer is i32, second is in bytes - (DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) => size_of::() * 8, - (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => size_of::() * 8, - (DataType::Utf8, _) | (DataType::Binary, _) | (DataType::List(_), _)=> { + (DataType::Utf8, 1) | (DataType::Binary, 1) | (DataType::List(_), 1) | (DataType::Map(_, _), 1) => size_of::() * 8, + (DataType::Utf8, 2) | (DataType::Binary, 2) => size_of::() * 8, + (DataType::List(_), _) | (DataType::Map(_, _), _) => { + return Err(ArrowError::CDataInterface(format!( + "The datatype \"{:?}\" expects 2 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", + data_type, i + ))) + } + (DataType::Utf8, _) | (DataType::Binary, _) => { return Err(ArrowError::CDataInterface(format!( "The datatype \"{:?}\" expects 3 buffers, but requested {}. Please verify that the C data interface is correctly implemented.", data_type, i @@ -674,13 +685,14 @@ pub trait ArrowArrayRef { | (DataType::Binary, 1) | (DataType::LargeBinary, 1) | (DataType::List(_), 1) - | (DataType::LargeList(_), 1) => { + | (DataType::LargeList(_), 1) + | (DataType::Map(_, _), 1) => { // the len of the offset buffer (buffer 1) equals length + 1 let bits = bit_width(data_type, i)?; debug_assert_eq!(bits % 8, 0); (self.array().length as usize + 1) * (bits / 8) } - (DataType::Utf8, 2) | (DataType::Binary, 2) | (DataType::List(_), 2) => { + (DataType::Utf8, 2) | (DataType::Binary, 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -692,9 +704,7 @@ pub trait ArrowArrayRef { // get last offset (unsafe { *offset_buffer.add(len / size_of::() - 1) }) as usize } - (DataType::LargeUtf8, 2) - | (DataType::LargeBinary, 2) - | (DataType::LargeList(_), 2) => { + (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = self.buffer_len(1)?; // first buffer is the null buffer => add(1) @@ -897,8 +907,9 @@ mod tests { use crate::array::{ export_array_into_raw, make_array, Array, ArrayData, BooleanArray, DecimalArray, DictionaryArray, DurationSecondArray, FixedSizeBinaryArray, FixedSizeListArray, - GenericBinaryArray, GenericListArray, GenericStringArray, Int32Array, NullArray, - OffsetSizeTrait, Time32MillisecondArray, TimestampMillisecondArray, + GenericBinaryArray, GenericListArray, GenericStringArray, Int32Array, MapArray, + NullArray, OffsetSizeTrait, Time32MillisecondArray, TimestampMillisecondArray, + UInt32Array, }; use crate::compute::kernels; use crate::datatypes::{Field, Int8Type}; @@ -1436,4 +1447,34 @@ mod tests { Ok(()) } + + #[test] + fn test_map_array() -> Result<()> { + let keys = vec!["a", "b", "c", "d", "e", "f", "g", "h"]; + let values_data = UInt32Array::from(vec![0u32, 10, 20, 30, 40, 50, 60, 70]); + + // Construct a buffer for value offsets, for the nested array: + // [[a, b, c], [d, e, f], [g, h]] + let entry_offsets = [0, 3, 6, 8]; + + let map_array = MapArray::new_from_strings( + keys.clone().into_iter(), + &values_data, + &entry_offsets, + ) + .unwrap(); + + // export it + let array = ArrowArray::try_from(map_array.data().clone())?; + + // (simulate consumer) import it + let data = ArrayData::try_from(array)?; + let array = make_array(data); + + // perform some operation + let array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(array, &map_array); + + Ok(()) + } }