Skip to content

Commit

Permalink
Deprecate RecordBatch::concat (#2594) (#2683)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Sep 8, 2022
1 parent 326ea5e commit 7a22ec0
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 99 deletions.
104 changes: 103 additions & 1 deletion arrow/src/compute/kernels/concat.rs
Expand Up @@ -31,8 +31,9 @@
//! ```

use crate::array::*;
use crate::datatypes::DataType;
use crate::datatypes::{DataType, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

fn compute_str_values_length<Offset: OffsetSizeTrait>(arrays: &[&ArrayData]) -> usize {
arrays
Expand Down Expand Up @@ -102,6 +103,35 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef> {
Ok(make_array(mutable.freeze()))
}

/// Concatenates `batches` together into a single record batch.
pub fn concat_batches(schema: &SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
if let Some((i, _)) = batches
.iter()
.enumerate()
.find(|&(_, batch)| batch.schema() != *schema)
{
return Err(ArrowError::InvalidArgumentError(format!(
"batches[{}] schema is different with argument schema.",
i
)));
}
let field_num = schema.fields().len();
let mut arrays = Vec::with_capacity(field_num);
for i in 0..field_num {
let array = concat(
&batches
.iter()
.map(|batch| batch.column(i).as_ref())
.collect::<Vec<_>>(),
)?;
arrays.push(array);
}
RecordBatch::try_new(schema.clone(), arrays)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -569,4 +599,76 @@ mod tests {
assert!(!copy.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
assert!(!new.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
}

#[test]
fn concat_record_batches() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
assert_eq!(2, new_batch.num_columns());
assert_eq!(4, new_batch.num_rows());
}

#[test]
fn concat_empty_record_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch = RecordBatch::concat(&schema, &[]).unwrap();
assert_eq!(batch.schema().as_ref(), schema.as_ref());
assert_eq!(0, batch.num_rows());
}

#[test]
fn concat_record_batches_of_different_schemas() {
let schema1 = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let schema2 = Arc::new(Schema::new(vec![
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema2,
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: batches[1] schema is different with argument schema.",
);
}
}
100 changes: 2 additions & 98 deletions arrow/src/record_batch.rs
Expand Up @@ -21,7 +21,6 @@
use std::sync::Arc;

use crate::array::*;
use crate::compute::kernels::concat::concat;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};

Expand Down Expand Up @@ -390,32 +389,9 @@ impl RecordBatch {
}

/// Concatenates `batches` together into a single record batch.
#[deprecated(note = "please use arrow::compute::concat_batches")]
pub fn concat(schema: &SchemaRef, batches: &[Self]) -> Result<Self> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
if let Some((i, _)) = batches
.iter()
.enumerate()
.find(|&(_, batch)| batch.schema() != *schema)
{
return Err(ArrowError::InvalidArgumentError(format!(
"batches[{}] schema is different with argument schema.",
i
)));
}
let field_num = schema.fields().len();
let mut arrays = Vec::with_capacity(field_num);
for i in 0..field_num {
let array = concat(
&batches
.iter()
.map(|batch| batch.column(i).as_ref())
.collect::<Vec<_>>(),
)?;
arrays.push(array);
}
Self::try_new(schema.clone(), arrays)
crate::compute::concat_batches(schema, batches)
}
}

Expand Down Expand Up @@ -713,78 +689,6 @@ mod tests {
assert_eq!(batch.column(1).as_ref(), int.as_ref());
}

#[test]
fn concat_record_batches() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
assert_eq!(2, new_batch.num_columns());
assert_eq!(4, new_batch.num_rows());
}

#[test]
fn concat_empty_record_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch = RecordBatch::concat(&schema, &[]).unwrap();
assert_eq!(batch.schema().as_ref(), schema.as_ref());
assert_eq!(0, batch.num_rows());
}

#[test]
fn concat_record_batches_of_different_schemas() {
let schema1 = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let schema2 = Arc::new(Schema::new(vec![
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema2,
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: batches[1] schema is different with argument schema.",
);
}

#[test]
fn record_batch_equality() {
let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
Expand Down

0 comments on commit 7a22ec0

Please sign in to comment.