Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validation to RecordBatch for non-nullable fields containing null values #1890

Merged
merged 8 commits into from Jun 18, 2022
2 changes: 1 addition & 1 deletion arrow/src/array/array_binary.rs
Expand Up @@ -1806,7 +1806,7 @@ mod tests {
)]
fn fixed_size_binary_array_all_null_in_batch_with_schema() {
let schema =
Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), false)]);
Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), true)]);

let none_option: Option<[u8; 2]> = None;
let item = FixedSizeBinaryArray::try_from_sparse_iter(
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/csv/reader.rs
Expand Up @@ -1439,7 +1439,7 @@ mod tests {
fn test_nulls() {
let schema = Schema::new(vec![
Field::new("c_int", DataType::UInt64, false),
Field::new("c_float", DataType::Float32, false),
Field::new("c_float", DataType::Float32, true),
Field::new("c_string", DataType::Utf8, false),
]);

Expand Down
4 changes: 2 additions & 2 deletions arrow/src/ipc/writer.rs
Expand Up @@ -962,7 +962,7 @@ mod tests {

#[test]
fn test_write_file() {
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
let values: Vec<Option<u32>> = vec![
Some(999),
None,
Expand Down Expand Up @@ -1011,7 +1011,7 @@ mod tests {
let schema = Schema::new(vec![
Field::new("nulls", DataType::Null, true),
Field::new("int32s", DataType::Int32, false),
Field::new("nulls2", DataType::Null, false),
Field::new("nulls2", DataType::Null, true),
Field::new("f64s", DataType::Float64, false),
]);
let array1 = NullArray::new(32);
Expand Down
11 changes: 5 additions & 6 deletions arrow/src/json/reader.rs
Expand Up @@ -33,7 +33,7 @@
//! let schema = Schema::new(vec![
//! Field::new("a", DataType::Float64, false),
//! Field::new("b", DataType::Float64, false),
//! Field::new("c", DataType::Float64, false),
//! Field::new("c", DataType::Float64, true),
//! ]);
//!
//! let file = File::open("test/data/basic.json").unwrap();
Expand Down Expand Up @@ -1869,7 +1869,7 @@ mod tests {
#[test]
fn test_json_basic_schema() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Utf8, false),
Expand Down Expand Up @@ -1917,8 +1917,7 @@ mod tests {

#[test]
fn test_json_format_strings_for_date() {
let schema =
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
let schema = Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, true)]));
let e = schema.column_with_name("e").unwrap();
assert_eq!(&DataType::Date32, e.1.data_type());
let mut fmts = HashMap::new();
Expand Down Expand Up @@ -1952,7 +1951,7 @@ mod tests {
// Implicit: omitting fields from a schema
// Explicit: supplying a vec of fields to take
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
]);
Expand All @@ -1964,7 +1963,7 @@ mod tests {
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::Boolean, false),
]));
assert_eq!(reader_schema, expected_schema);
Expand Down
38 changes: 19 additions & 19 deletions arrow/src/json/writer.rs
Expand Up @@ -879,11 +879,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), false),
Field::new("micros", arr_micros.data_type().clone(), false),
Field::new("millis", arr_millis.data_type().clone(), false),
Field::new("secs", arr_secs.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -929,8 +929,8 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("date32", arr_date32.data_type().clone(), false),
Field::new("date64", arr_date64.data_type().clone(), false),
Field::new("date32", arr_date32.data_type().clone(), true),
Field::new("date64", arr_date64.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), false),
]);
let schema = Arc::new(schema);
Expand Down Expand Up @@ -968,11 +968,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("time32sec", arr_time32sec.data_type().clone(), false),
Field::new("time32msec", arr_time32msec.data_type().clone(), false),
Field::new("time64usec", arr_time64usec.data_type().clone(), false),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("time32sec", arr_time32sec.data_type().clone(), true),
Field::new("time32msec", arr_time32msec.data_type().clone(), true),
Field::new("time64usec", arr_time64usec.data_type().clone(), true),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -1011,11 +1011,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("duration_sec", arr_durationsec.data_type().clone(), false),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), false),
Field::new("duration_usec", arr_durationusec.data_type().clone(), false),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -1159,7 +1159,7 @@ mod tests {
DataType::List(Box::new(list_inner_type.clone())),
false,
);
let field_c2 = Field::new("c2", DataType::Utf8, false);
let field_c2 = Field::new("c2", DataType::Utf8, true);
let schema = Schema::new(vec![field_c1.clone(), field_c2]);

// list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
Expand Down Expand Up @@ -1444,7 +1444,7 @@ mod tests {

let map = MapArray::from(map_data);

let map_field = Field::new("map", map_data_type, false);
let map_field = Field::new("map", map_data_type, true);
let schema = Arc::new(Schema::new(vec![map_field]));

let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
Expand Down
23 changes: 23 additions & 0 deletions arrow/src/record_batch.rs
Expand Up @@ -138,6 +138,19 @@ impl RecordBatch {
)
})?;

for (c, f) in columns.iter().zip(&schema.fields) {
if !f.is_nullable() && c.null_count() > 0 {
// hacky workaround for known issue with dictionary IPC encoding
// https://github.com/apache/arrow-rs/issues/1892
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1893 should fix this issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya I have upmerged and removed this workaround

if !f.name().is_empty() {
return Err(ArrowError::InvalidArgumentError(format!(
"Column '{}' is declared as non-nullable but contains null values",
f.name()
)));
}
}
}

if columns.iter().any(|c| c.len() != row_count) {
let err = match options.row_count {
Some(_) => {
Expand Down Expand Up @@ -979,4 +992,14 @@ mod tests {
assert_ne!(a, b);
assert_eq!(b, RecordBatch::new_empty(schema))
}

#[test]
fn test_nulls_in_non_nullable_field() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let maybe_batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![Some(1), None]))],
);
assert_eq!("Invalid argument error: Column 'a' is declared as non-nullable but contains null values", format!("{}", maybe_batch.err().unwrap()));
}
}