From 7a22ec0cf297f9c9d0c229bf3b699ec9f4edffc0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 8 Sep 2022 18:27:23 +0100 Subject: [PATCH] Deprecate RecordBatch::concat (#2594) (#2683) --- arrow/src/compute/kernels/concat.rs | 104 +++++++++++++++++++++++++++- arrow/src/record_batch.rs | 100 +------------------------- 2 files changed, 105 insertions(+), 99 deletions(-) diff --git a/arrow/src/compute/kernels/concat.rs b/arrow/src/compute/kernels/concat.rs index a7a3ffc782c..df6436efe84 100644 --- a/arrow/src/compute/kernels/concat.rs +++ b/arrow/src/compute/kernels/concat.rs @@ -31,8 +31,9 @@ //! ``` use crate::array::*; -use crate::datatypes::DataType; +use crate::datatypes::{DataType, SchemaRef}; use crate::error::{ArrowError, Result}; +use crate::record_batch::RecordBatch; fn compute_str_values_length(arrays: &[&ArrayData]) -> usize { arrays @@ -102,6 +103,35 @@ pub fn concat(arrays: &[&dyn Array]) -> Result { Ok(make_array(mutable.freeze())) } +/// Concatenates `batches` together into a single record batch. +pub fn concat_batches(schema: &SchemaRef, batches: &[RecordBatch]) -> Result { + if batches.is_empty() { + return Ok(RecordBatch::new_empty(schema.clone())); + } + if let Some((i, _)) = batches + .iter() + .enumerate() + .find(|&(_, batch)| batch.schema() != *schema) + { + return Err(ArrowError::InvalidArgumentError(format!( + "batches[{}] schema is different with argument schema.", + i + ))); + } + let field_num = schema.fields().len(); + let mut arrays = Vec::with_capacity(field_num); + for i in 0..field_num { + let array = concat( + &batches + .iter() + .map(|batch| batch.column(i).as_ref()) + .collect::>(), + )?; + arrays.push(array); + } + RecordBatch::try_new(schema.clone(), arrays) +} + #[cfg(test)] mod tests { use super::*; @@ -569,4 +599,76 @@ mod tests { assert!(!copy.data().child_data()[0].ptr_eq(&combined.data().child_data()[0])); assert!(!new.data().child_data()[0].ptr_eq(&combined.data().child_data()[0])); } + + #[test] + fn concat_record_batches() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["a", "b"])), + ], + ) + .unwrap(); + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["c", "d"])), + ], + ) + .unwrap(); + let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap(); + assert_eq!(new_batch.schema().as_ref(), schema.as_ref()); + assert_eq!(2, new_batch.num_columns()); + assert_eq!(4, new_batch.num_rows()); + } + + #[test] + fn concat_empty_record_batch() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + let batch = RecordBatch::concat(&schema, &[]).unwrap(); + assert_eq!(batch.schema().as_ref(), schema.as_ref()); + assert_eq!(0, batch.num_rows()); + } + + #[test] + fn concat_record_batches_of_different_schemas() { + let schema1 = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + let schema2 = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Int32, false), + Field::new("d", DataType::Utf8, false), + ])); + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])), + Arc::new(StringArray::from(vec!["a", "b"])), + ], + ) + .unwrap(); + let batch2 = RecordBatch::try_new( + schema2, + vec![ + Arc::new(Int32Array::from(vec![3, 4])), + Arc::new(StringArray::from(vec!["c", "d"])), + ], + ) + .unwrap(); + let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err(); + assert_eq!( + error.to_string(), + "Invalid argument error: batches[1] schema is different with argument schema.", + ); + } } diff --git a/arrow/src/record_batch.rs b/arrow/src/record_batch.rs index 47257b496c1..d1db1f1a4c1 100644 --- a/arrow/src/record_batch.rs +++ b/arrow/src/record_batch.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use crate::array::*; -use crate::compute::kernels::concat::concat; use crate::datatypes::*; use crate::error::{ArrowError, Result}; @@ -390,32 +389,9 @@ impl RecordBatch { } /// Concatenates `batches` together into a single record batch. + #[deprecated(note = "please use arrow::compute::concat_batches")] pub fn concat(schema: &SchemaRef, batches: &[Self]) -> Result { - if batches.is_empty() { - return Ok(RecordBatch::new_empty(schema.clone())); - } - if let Some((i, _)) = batches - .iter() - .enumerate() - .find(|&(_, batch)| batch.schema() != *schema) - { - return Err(ArrowError::InvalidArgumentError(format!( - "batches[{}] schema is different with argument schema.", - i - ))); - } - let field_num = schema.fields().len(); - let mut arrays = Vec::with_capacity(field_num); - for i in 0..field_num { - let array = concat( - &batches - .iter() - .map(|batch| batch.column(i).as_ref()) - .collect::>(), - )?; - arrays.push(array); - } - Self::try_new(schema.clone(), arrays) + crate::compute::concat_batches(schema, batches) } } @@ -713,78 +689,6 @@ mod tests { assert_eq!(batch.column(1).as_ref(), int.as_ref()); } - #[test] - fn concat_record_batches() { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), - ])); - let batch1 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2])), - Arc::new(StringArray::from(vec!["a", "b"])), - ], - ) - .unwrap(); - let batch2 = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![3, 4])), - Arc::new(StringArray::from(vec!["c", "d"])), - ], - ) - .unwrap(); - let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap(); - assert_eq!(new_batch.schema().as_ref(), schema.as_ref()); - assert_eq!(2, new_batch.num_columns()); - assert_eq!(4, new_batch.num_rows()); - } - - #[test] - fn concat_empty_record_batch() { - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), - ])); - let batch = RecordBatch::concat(&schema, &[]).unwrap(); - assert_eq!(batch.schema().as_ref(), schema.as_ref()); - assert_eq!(0, batch.num_rows()); - } - - #[test] - fn concat_record_batches_of_different_schemas() { - let schema1 = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Utf8, false), - ])); - let schema2 = Arc::new(Schema::new(vec![ - Field::new("c", DataType::Int32, false), - Field::new("d", DataType::Utf8, false), - ])); - let batch1 = RecordBatch::try_new( - schema1.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2])), - Arc::new(StringArray::from(vec!["a", "b"])), - ], - ) - .unwrap(); - let batch2 = RecordBatch::try_new( - schema2, - vec![ - Arc::new(Int32Array::from(vec![3, 4])), - Arc::new(StringArray::from(vec!["c", "d"])), - ], - ) - .unwrap(); - let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err(); - assert_eq!( - error.to_string(), - "Invalid argument error: batches[1] schema is different with argument schema.", - ); - } - #[test] fn record_batch_equality() { let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);