Skip to content

Commit

Permalink
Fix ipc schema custom_metadata serialization (#3282)
Browse files Browse the repository at this point in the history
* Fix ipc schema custom_metadata serialization

* Fix ipc doc test

* PR comments
  • Loading branch information
Jefffrey committed Dec 8, 2022
1 parent 2e806b0 commit 7b71713
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 45 deletions.
95 changes: 58 additions & 37 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,37 @@ pub fn schema_to_fb_offset<'a>(
fbb: &mut FlatBufferBuilder<'a>,
schema: &Schema,
) -> WIPOffset<crate::Schema<'a>> {
let mut fields = vec![];
for field in schema.fields() {
let fb_field = build_field(fbb, field);
fields.push(fb_field);
}

let mut custom_metadata = vec![];
for (k, v) in schema.metadata() {
let fb_key_name = fbb.create_string(k.as_str());
let fb_val_name = fbb.create_string(v.as_str());
let fields = schema
.fields()
.iter()
.map(|field| build_field(fbb, field))
.collect::<Vec<_>>();
let fb_field_list = fbb.create_vector(&fields);

let mut kv_builder = crate::KeyValueBuilder::new(fbb);
kv_builder.add_key(fb_key_name);
kv_builder.add_value(fb_val_name);
custom_metadata.push(kv_builder.finish());
}
let fb_metadata_list = if !schema.metadata().is_empty() {
let custom_metadata = schema
.metadata()
.iter()
.map(|(k, v)| {
let fb_key_name = fbb.create_string(k);
let fb_val_name = fbb.create_string(v);

let fb_field_list = fbb.create_vector(&fields);
let fb_metadata_list = fbb.create_vector(&custom_metadata);
let mut kv_builder = crate::KeyValueBuilder::new(fbb);
kv_builder.add_key(fb_key_name);
kv_builder.add_value(fb_val_name);
kv_builder.finish()
})
.collect::<Vec<_>>();
Some(fbb.create_vector(&custom_metadata))
} else {
None
};

let mut builder = crate::SchemaBuilder::new(fbb);
builder.add_fields(fb_field_list);
builder.add_custom_metadata(fb_metadata_list);
if let Some(fb_metadata_list) = fb_metadata_list {
builder.add_custom_metadata(fb_metadata_list);
}
builder.finish()
}

Expand Down Expand Up @@ -1031,32 +1039,45 @@ mod tests {

#[test]
fn schema_from_bytes() {
// bytes of a schema generated from python (0.14.0), saved as an `crate::Message`.
// the schema is: Field("field1", DataType::UInt32, false)
// Bytes of a schema generated via following python code, using pyarrow 10.0.1:
//
// import pyarrow as pa
// schema = pa.schema([pa.field('field1', pa.uint32(), nullable=False)])
// sink = pa.BufferOutputStream()
// with pa.ipc.new_stream(sink, schema) as writer:
// pass
// # stripping continuation & length prefix & suffix bytes to get only schema bytes
// [x for x in sink.getvalue().to_pybytes()][8:-8]
let bytes: Vec<u8> = vec![
16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 3, 0,
16, 0, 0, 0, 0, 0, 10, 0, 12, 0, 6, 0, 5, 0, 8, 0, 10, 0, 0, 0, 0, 1, 4, 0,
12, 0, 0, 0, 8, 0, 8, 0, 0, 0, 4, 0, 8, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0, 20,
0, 0, 0, 16, 0, 20, 0, 8, 0, 0, 0, 7, 0, 12, 0, 0, 0, 16, 0, 16, 0, 0, 0, 0,
0, 0, 2, 32, 0, 0, 0, 20, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6, 0, 8, 0,
4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49, 0, 0,
0, 0, 0, 0,
0, 0, 2, 16, 0, 0, 0, 32, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 6, 0, 0, 0, 102,
105, 101, 108, 100, 49, 0, 0, 0, 0, 6, 0, 8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0,
0,
];
let ipc = crate::root_as_message(&bytes[..]).unwrap();
let ipc = crate::root_as_message(&bytes).unwrap();
let schema = ipc.header_as_schema().unwrap();

// a message generated from Rust, same as the Python one
let bytes: Vec<u8> = vec![
16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 12, 0, 11, 0, 4, 0, 10, 0, 0, 0, 20, 0, 0,
0, 0, 0, 0, 1, 3, 0, 10, 0, 12, 0, 0, 0, 8, 0, 4, 0, 10, 0, 0, 0, 8, 0, 0, 0,
8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 16, 0, 0, 0, 12, 0, 18, 0, 12, 0, 0, 0,
11, 0, 4, 0, 12, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 2, 20, 0, 0, 0, 0, 0, 6, 0,
8, 0, 4, 0, 6, 0, 0, 0, 32, 0, 0, 0, 6, 0, 0, 0, 102, 105, 101, 108, 100, 49,
0, 0,
];
let ipc2 = crate::root_as_message(&bytes[..]).unwrap();
let schema2 = ipc.header_as_schema().unwrap();
// generate same message with Rust
let data_gen = crate::writer::IpcDataGenerator::default();
let arrow_schema =
Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
let bytes = data_gen
.schema_to_bytes(&arrow_schema, &crate::writer::IpcWriteOptions::default())
.ipc_message;

let ipc2 = crate::root_as_message(&bytes).unwrap();
let schema2 = ipc2.header_as_schema().unwrap();

// can't compare schema directly as it compares the underlying bytes, which can differ
assert!(schema.custom_metadata().is_none());
assert!(schema2.custom_metadata().is_none());
assert_eq!(schema.endianness(), schema2.endianness());
assert!(schema.features().is_none());
assert!(schema2.features().is_none());
assert_eq!(fb_to_schema(schema), fb_to_schema(schema2));

assert_eq!(schema, schema2);
assert_eq!(ipc.version(), ipc2.version());
assert_eq!(ipc.header_type(), ipc2.header_type());
assert_eq!(ipc.bodyLength(), ipc2.bodyLength());
Expand Down
14 changes: 6 additions & 8 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,15 +793,13 @@ impl<W: Write> StreamWriter<W> {
/// # fn main() -> Result<(), ArrowError> {
/// // The result we expect from an empty schema
/// let expected = vec![
/// 255, 255, 255, 255, 64, 0, 0, 0,
/// 255, 255, 255, 255, 48, 0, 0, 0,
/// 16, 0, 0, 0, 0, 0, 10, 0,
/// 14, 0, 12, 0, 11, 0, 4, 0,
/// 10, 0, 0, 0, 20, 0, 0, 0,
/// 0, 0, 0, 1, 4, 0, 10, 0,
/// 12, 0, 0, 0, 8, 0, 4, 0,
/// 10, 0, 0, 0, 8, 0, 0, 0,
/// 8, 0, 0, 0, 0, 0, 0, 0,
/// 0, 0, 0, 0, 0, 0, 0, 0,
/// 12, 0, 10, 0, 9, 0, 4, 0,
/// 10, 0, 0, 0, 16, 0, 0, 0,
/// 0, 1, 4, 0, 8, 0, 8, 0,
/// 0, 0, 4, 0, 8, 0, 0, 0,
/// 4, 0, 0, 0, 0, 0, 0, 0,
/// 255, 255, 255, 255, 0, 0, 0, 0
/// ];
///
Expand Down

0 comments on commit 7b71713

Please sign in to comment.