Skip to content

Commit

Permalink
Use IPC row count info in IPC reader (#1796)
Browse files Browse the repository at this point in the history
* Use IPC row count info

* Add test

* Update arrow/src/ipc/reader.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
viirya and tustvold committed Jun 6, 2022
1 parent e3191e7 commit b5fbd11
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions arrow/src/ipc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::compute::cast;
use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef, UnionMode};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader};

use ipc::CONTINUATION_MARKER;
use DataType::*;
Expand Down Expand Up @@ -608,6 +608,11 @@ pub fn read_record_batch(
let mut node_index = 0;
let mut arrays = vec![];

let options = RecordBatchOptions {
row_count: Some(batch.length() as usize),
..Default::default()
};

if let Some(projection) = projection {
// project fields
for (idx, field) in schema.fields().iter().enumerate() {
Expand Down Expand Up @@ -643,7 +648,11 @@ pub fn read_record_batch(
}
}

RecordBatch::try_new(Arc::new(schema.project(projection)?), arrays)
RecordBatch::try_new_with_options(
Arc::new(schema.project(projection)?),
arrays,
&options,
)
} else {
// keep track of index as lists require more than one node
for field in schema.fields() {
Expand All @@ -661,7 +670,7 @@ pub fn read_record_batch(
buffer_index = triple.2;
arrays.push(triple.0);
}
RecordBatch::try_new(schema, arrays)
RecordBatch::try_new_with_options(schema, arrays, &options)
}
}

Expand Down Expand Up @@ -1933,4 +1942,17 @@ mod tests {
let output_batch = roundtrip_ipc_stream(&input_batch);
assert_eq!(input_batch, output_batch);
}

#[test]
fn test_no_columns_batch() {
let schema = Arc::new(Schema::new(vec![]));
let options = RecordBatchOptions {
match_field_names: true,
row_count: Some(10),
};
let input_batch =
RecordBatch::try_new_with_options(schema, vec![], &options).unwrap();
let output_batch = roundtrip_ipc_stream(&input_batch);
assert_eq!(input_batch, output_batch);
}
}

0 comments on commit b5fbd11

Please sign in to comment.