Skip to content

Commit

Permalink
Add validation to RecordBatch for non-nullable fields containing nu…
Browse files Browse the repository at this point in the history
…ll values (#1890)
  • Loading branch information
andygrove committed Jun 18, 2022
1 parent c7f89e1 commit 535cd20
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 29 deletions.
2 changes: 1 addition & 1 deletion arrow/src/array/array_binary.rs
Expand Up @@ -1806,7 +1806,7 @@ mod tests {
)]
fn fixed_size_binary_array_all_null_in_batch_with_schema() {
let schema =
Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), false)]);
Schema::new(vec![Field::new("a", DataType::FixedSizeBinary(2), true)]);

let none_option: Option<[u8; 2]> = None;
let item = FixedSizeBinaryArray::try_from_sparse_iter(
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/csv/reader.rs
Expand Up @@ -1439,7 +1439,7 @@ mod tests {
fn test_nulls() {
let schema = Schema::new(vec![
Field::new("c_int", DataType::UInt64, false),
Field::new("c_float", DataType::Float32, false),
Field::new("c_float", DataType::Float32, true),
Field::new("c_string", DataType::Utf8, false),
]);

Expand Down
4 changes: 2 additions & 2 deletions arrow/src/ipc/writer.rs
Expand Up @@ -962,7 +962,7 @@ mod tests {

#[test]
fn test_write_file() {
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);
let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);
let values: Vec<Option<u32>> = vec![
Some(999),
None,
Expand Down Expand Up @@ -1011,7 +1011,7 @@ mod tests {
let schema = Schema::new(vec![
Field::new("nulls", DataType::Null, true),
Field::new("int32s", DataType::Int32, false),
Field::new("nulls2", DataType::Null, false),
Field::new("nulls2", DataType::Null, true),
Field::new("f64s", DataType::Float64, false),
]);
let array1 = NullArray::new(32);
Expand Down
11 changes: 5 additions & 6 deletions arrow/src/json/reader.rs
Expand Up @@ -33,7 +33,7 @@
//! let schema = Schema::new(vec![
//! Field::new("a", DataType::Float64, false),
//! Field::new("b", DataType::Float64, false),
//! Field::new("c", DataType::Float64, false),
//! Field::new("c", DataType::Float64, true),
//! ]);
//!
//! let file = File::open("test/data/basic.json").unwrap();
Expand Down Expand Up @@ -1869,7 +1869,7 @@ mod tests {
#[test]
fn test_json_basic_schema() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Utf8, false),
Expand Down Expand Up @@ -1917,8 +1917,7 @@ mod tests {

#[test]
fn test_json_format_strings_for_date() {
let schema =
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
let schema = Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, true)]));
let e = schema.column_with_name("e").unwrap();
assert_eq!(&DataType::Date32, e.1.data_type());
let mut fmts = HashMap::new();
Expand Down Expand Up @@ -1952,7 +1951,7 @@ mod tests {
// Implicit: omitting fields from a schema
// Explicit: supplying a vec of fields to take
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
]);
Expand All @@ -1964,7 +1963,7 @@ mod tests {
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::Boolean, false),
]));
assert_eq!(reader_schema, expected_schema);
Expand Down
38 changes: 19 additions & 19 deletions arrow/src/json/writer.rs
Expand Up @@ -879,11 +879,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("nanos", arr_nanos.data_type().clone(), false),
Field::new("micros", arr_micros.data_type().clone(), false),
Field::new("millis", arr_millis.data_type().clone(), false),
Field::new("secs", arr_secs.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("nanos", arr_nanos.data_type().clone(), true),
Field::new("micros", arr_micros.data_type().clone(), true),
Field::new("millis", arr_millis.data_type().clone(), true),
Field::new("secs", arr_secs.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -929,8 +929,8 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("date32", arr_date32.data_type().clone(), false),
Field::new("date64", arr_date64.data_type().clone(), false),
Field::new("date32", arr_date32.data_type().clone(), true),
Field::new("date64", arr_date64.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), false),
]);
let schema = Arc::new(schema);
Expand Down Expand Up @@ -968,11 +968,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("time32sec", arr_time32sec.data_type().clone(), false),
Field::new("time32msec", arr_time32msec.data_type().clone(), false),
Field::new("time64usec", arr_time64usec.data_type().clone(), false),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("time32sec", arr_time32sec.data_type().clone(), true),
Field::new("time32msec", arr_time32msec.data_type().clone(), true),
Field::new("time64usec", arr_time64usec.data_type().clone(), true),
Field::new("time64nsec", arr_time64nsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -1011,11 +1011,11 @@ mod tests {
let arr_names = StringArray::from(vec![Some("a"), Some("b")]);

let schema = Schema::new(vec![
Field::new("duration_sec", arr_durationsec.data_type().clone(), false),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), false),
Field::new("duration_usec", arr_durationusec.data_type().clone(), false),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), false),
Field::new("name", arr_names.data_type().clone(), false),
Field::new("duration_sec", arr_durationsec.data_type().clone(), true),
Field::new("duration_msec", arr_durationmsec.data_type().clone(), true),
Field::new("duration_usec", arr_durationusec.data_type().clone(), true),
Field::new("duration_nsec", arr_durationnsec.data_type().clone(), true),
Field::new("name", arr_names.data_type().clone(), true),
]);
let schema = Arc::new(schema);

Expand Down Expand Up @@ -1159,7 +1159,7 @@ mod tests {
DataType::List(Box::new(list_inner_type.clone())),
false,
);
let field_c2 = Field::new("c2", DataType::Utf8, false);
let field_c2 = Field::new("c2", DataType::Utf8, true);
let schema = Schema::new(vec![field_c1.clone(), field_c2]);

// list column rows: [[1, 2], [3]], [], [[4, 5, 6]]
Expand Down Expand Up @@ -1444,7 +1444,7 @@ mod tests {

let map = MapArray::from(map_data);

let map_field = Field::new("map", map_data_type, false);
let map_field = Field::new("map", map_data_type, true);
let schema = Arc::new(Schema::new(vec![map_field]));

let batch = RecordBatch::try_new(schema, vec![Arc::new(map)]).unwrap();
Expand Down
19 changes: 19 additions & 0 deletions arrow/src/record_batch.rs
Expand Up @@ -138,6 +138,15 @@ impl RecordBatch {
)
})?;

for (c, f) in columns.iter().zip(&schema.fields) {
if !f.is_nullable() && c.null_count() > 0 {
return Err(ArrowError::InvalidArgumentError(format!(
"Column '{}' is declared as non-nullable but contains null values",
f.name()
)));
}
}

if columns.iter().any(|c| c.len() != row_count) {
let err = match options.row_count {
Some(_) => {
Expand Down Expand Up @@ -979,4 +988,14 @@ mod tests {
assert_ne!(a, b);
assert_eq!(b, RecordBatch::new_empty(schema))
}

#[test]
fn test_nulls_in_non_nullable_field() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
let maybe_batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(vec![Some(1), None]))],
);
assert_eq!("Invalid argument error: Column 'a' is declared as non-nullable but contains null values", format!("{}", maybe_batch.err().unwrap()));
}
}

0 comments on commit 535cd20

Please sign in to comment.