Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate RecordBatch::concat replace with concat_batches (#2594) #2683

Merged
merged 1 commit into from Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
104 changes: 103 additions & 1 deletion arrow/src/compute/kernels/concat.rs
Expand Up @@ -31,8 +31,9 @@
//! ```

use crate::array::*;
use crate::datatypes::DataType;
use crate::datatypes::{DataType, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

fn compute_str_values_length<Offset: OffsetSizeTrait>(arrays: &[&ArrayData]) -> usize {
arrays
Expand Down Expand Up @@ -102,6 +103,35 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef> {
Ok(make_array(mutable.freeze()))
}

/// Concatenates `batches` together into a single record batch.
pub fn concat_batches(schema: &SchemaRef, batches: &[RecordBatch]) -> Result<RecordBatch> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
if let Some((i, _)) = batches
.iter()
.enumerate()
.find(|&(_, batch)| batch.schema() != *schema)
{
return Err(ArrowError::InvalidArgumentError(format!(
"batches[{}] schema is different with argument schema.",
i
)));
}
let field_num = schema.fields().len();
let mut arrays = Vec::with_capacity(field_num);
for i in 0..field_num {
let array = concat(
&batches
.iter()
.map(|batch| batch.column(i).as_ref())
.collect::<Vec<_>>(),
)?;
arrays.push(array);
}
RecordBatch::try_new(schema.clone(), arrays)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -569,4 +599,76 @@ mod tests {
assert!(!copy.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
assert!(!new.data().child_data()[0].ptr_eq(&combined.data().child_data()[0]));
}

#[test]
fn concat_record_batches() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
assert_eq!(2, new_batch.num_columns());
assert_eq!(4, new_batch.num_rows());
}

#[test]
fn concat_empty_record_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch = RecordBatch::concat(&schema, &[]).unwrap();
assert_eq!(batch.schema().as_ref(), schema.as_ref());
assert_eq!(0, batch.num_rows());
}

#[test]
fn concat_record_batches_of_different_schemas() {
let schema1 = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let schema2 = Arc::new(Schema::new(vec![
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema2,
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: batches[1] schema is different with argument schema.",
);
}
}
100 changes: 2 additions & 98 deletions arrow/src/record_batch.rs
Expand Up @@ -21,7 +21,6 @@
use std::sync::Arc;

use crate::array::*;
use crate::compute::kernels::concat::concat;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};

Expand Down Expand Up @@ -390,32 +389,9 @@ impl RecordBatch {
}

/// Concatenates `batches` together into a single record batch.
#[deprecated(note = "please use arrow::compute::concat_batches")]
pub fn concat(schema: &SchemaRef, batches: &[Self]) -> Result<Self> {
if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
if let Some((i, _)) = batches
.iter()
.enumerate()
.find(|&(_, batch)| batch.schema() != *schema)
{
return Err(ArrowError::InvalidArgumentError(format!(
"batches[{}] schema is different with argument schema.",
i
)));
}
let field_num = schema.fields().len();
let mut arrays = Vec::with_capacity(field_num);
for i in 0..field_num {
let array = concat(
&batches
.iter()
.map(|batch| batch.column(i).as_ref())
.collect::<Vec<_>>(),
)?;
arrays.push(array);
}
Self::try_new(schema.clone(), arrays)
crate::compute::concat_batches(schema, batches)
}
}

Expand Down Expand Up @@ -713,78 +689,6 @@ mod tests {
assert_eq!(batch.column(1).as_ref(), int.as_ref());
}

#[test]
fn concat_record_batches() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let new_batch = RecordBatch::concat(&schema, &[batch1, batch2]).unwrap();
assert_eq!(new_batch.schema().as_ref(), schema.as_ref());
assert_eq!(2, new_batch.num_columns());
assert_eq!(4, new_batch.num_rows());
}

#[test]
fn concat_empty_record_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let batch = RecordBatch::concat(&schema, &[]).unwrap();
assert_eq!(batch.schema().as_ref(), schema.as_ref());
assert_eq!(0, batch.num_rows());
}

#[test]
fn concat_record_batches_of_different_schemas() {
let schema1 = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
]));
let schema2 = Arc::new(Schema::new(vec![
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Utf8, false),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(Int32Array::from(vec![1, 2])),
Arc::new(StringArray::from(vec!["a", "b"])),
],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema2,
vec![
Arc::new(Int32Array::from(vec![3, 4])),
Arc::new(StringArray::from(vec!["c", "d"])),
],
)
.unwrap();
let error = RecordBatch::concat(&schema1, &[batch1, batch2]).unwrap_err();
assert_eq!(
error.to_string(),
"Invalid argument error: batches[1] schema is different with argument schema.",
);
}

#[test]
fn record_batch_equality() {
let id_arr1 = Int32Array::from(vec![1, 2, 3, 4]);
Expand Down