Skip to content

Commit

Permalink
Write validity buffer for UnionArray in V4 IPC message (#1794)
Browse files Browse the repository at this point in the history
* Write validity buffer for Union Array in V4 IPC message

* Add test

* Fix clippy

* Fix clippy
  • Loading branch information
viirya committed Jun 6, 2022
1 parent 9b220a5 commit e3191e7
Showing 1 changed file with 80 additions and 6 deletions.
86 changes: 80 additions & 6 deletions arrow/src/ipc/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ impl IpcDataGenerator {
offset,
array.len(),
array.null_count(),
write_options,
);
}

Expand Down Expand Up @@ -389,6 +390,7 @@ impl IpcDataGenerator {
0,
array_data.len(),
array_data.null_count(),
write_options,
);

// write data
Expand Down Expand Up @@ -849,7 +851,18 @@ fn write_continuation<W: Write>(
Ok(written)
}

/// In V4, null types have no validity bitmap
/// In V5 and later, null and union types have no validity bitmap
fn has_validity_bitmap(data_type: &DataType, write_options: &IpcWriteOptions) -> bool {
if write_options.metadata_version < ipc::MetadataVersion::V5 {
!matches!(data_type, DataType::Null)
} else {
!matches!(data_type, DataType::Null | DataType::Union(_, _, _))
}
}

/// Write array data to a vector of bytes
#[allow(clippy::too_many_arguments)]
fn write_array_data(
array_data: &ArrayData,
buffers: &mut Vec<ipc::Buffer>,
Expand All @@ -858,6 +871,7 @@ fn write_array_data(
offset: i64,
num_rows: usize,
null_count: usize,
write_options: &IpcWriteOptions,
) -> i64 {
let mut offset = offset;
if !matches!(array_data.data_type(), DataType::Null) {
Expand All @@ -867,12 +881,7 @@ fn write_array_data(
// where null_count is always 0.
nodes.push(ipc::FieldNode::new(num_rows as i64, num_rows as i64));
}
// NullArray does not have any buffers, thus the null buffer is not generated
// UnionArray does not have a validity buffer
if !matches!(
array_data.data_type(),
DataType::Null | DataType::Union(_, _, _)
) {
if has_validity_bitmap(array_data.data_type(), write_options) {
// write null buffer if exists
let null_buffer = match array_data.null_buffer() {
None => {
Expand Down Expand Up @@ -904,6 +913,7 @@ fn write_array_data(
offset,
data_ref.len(),
data_ref.null_count(),
write_options,
);
});
}
Expand Down Expand Up @@ -1433,4 +1443,68 @@ mod tests {
},
);
}

fn write_union_file(options: IpcWriteOptions) {
let schema = Schema::new(vec![Field::new(
"union",
DataType::Union(
vec![
Field::new("a", DataType::Int32, false),
Field::new("c", DataType::Float64, false),
],
vec![0, 1],
UnionMode::Sparse,
),
true,
)]);
let mut builder = UnionBuilder::new_sparse(5);
builder.append::<Int32Type>("a", 1).unwrap();
builder.append_null::<Int32Type>("a").unwrap();
builder.append::<Float64Type>("c", 3.0).unwrap();
builder.append_null::<Float64Type>("c").unwrap();
builder.append::<Int32Type>("a", 4).unwrap();
let union = builder.build().unwrap();

let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(union) as ArrayRef],
)
.unwrap();
let file_name = "target/debug/testdata/union.arrow_file";
{
let file = File::create(&file_name).unwrap();
let mut writer =
FileWriter::try_new_with_options(file, &schema, options).unwrap();

writer.write(&batch).unwrap();
writer.finish().unwrap();
}

{
let file = File::open(&file_name).unwrap();
let reader = FileReader::try_new(file, None).unwrap();
reader.for_each(|maybe_batch| {
maybe_batch
.unwrap()
.columns()
.iter()
.zip(batch.columns())
.for_each(|(a, b)| {
assert_eq!(a.data_type(), b.data_type());
assert_eq!(a.len(), b.len());
assert_eq!(a.null_count(), b.null_count());
});
});
}
}

#[test]
fn test_write_union_file_v4_v5() {
write_union_file(
IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap(),
);
write_union_file(
IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(),
);
}
}

0 comments on commit e3191e7

Please sign in to comment.