Skip to content

Commit

Permalink
Add integration test for scan rows with selection (apache#2158)
Browse files Browse the repository at this point in the history
* add test

* fix some skip bug.

* add it.

* fix skip in head.

* refine test case

* fix fmt.

* fix clippy

* fix comment.

* fix comment
  • Loading branch information
Ted-Jiang committed Jul 27, 2022
1 parent e96ae8a commit d10d962
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 12 deletions.
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
Expand Down Expand Up @@ -120,6 +120,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -25,7 +25,7 @@ use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
Expand Down Expand Up @@ -181,6 +181,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
8 changes: 7 additions & 1 deletion parquet/src/arrow/array_reader/complex_object_array.rs
Expand Up @@ -166,7 +166,13 @@ where
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
match self.column_reader.as_mut() {
Some(reader) => reader.skip_records(num_records),
None => Ok(0),
None => {
if self.next_column_reader()? {
self.column_reader.as_mut().unwrap().skip_records(num_records)
}else {
Ok(0)
}
}
}
}

Expand Down
27 changes: 27 additions & 0 deletions parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -144,3 +144,30 @@ where
}
Ok(records_read)
}

/// Uses `pages` to set up to `record_reader` 's `column_reader`
///
/// If we skip records before all read operation,
/// need set `column_reader` by `set_page_reader`
/// for constructing `def_level_decoder` and `rep_level_decoder`.
fn set_column_reader<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
) -> Result<bool>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
return if record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
Ok(true)
} else {
Ok(false)
}
} else {
Ok(true)
};
}
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/null_array.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::column::page::PageIterator;
Expand Down Expand Up @@ -97,6 +97,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/primitive_array.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -222,6 +222,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
223 changes: 219 additions & 4 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;

/// Arrow reader api.
Expand Down Expand Up @@ -217,7 +218,15 @@ impl ParquetFileArrowReader {
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
let file_reader = if options.selection.is_some() {
let options = ReadOptionsBuilder::new().with_page_index().build();
Arc::new(SerializedFileReader::new_with_options(
chunk_reader,
options,
)?)
} else {
Arc::new(SerializedFileReader::new(chunk_reader)?)
};
Ok(Self::new_with_options(file_reader, options))
}

Expand Down Expand Up @@ -298,12 +307,15 @@ impl Iterator for ParquetRecordBatchReader {
continue;
}

// try to read record
let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) => {
selection.push_front(RowSelection::skip(remaining));
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
selection.push_front(RowSelection::select(remaining));
self.batch_size
}
None => front.row_count,
_ => front.row_count,
};

break to_read;
Expand Down Expand Up @@ -390,6 +402,7 @@ mod tests {

use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
ParquetRecordBatchReader, RowSelection,
};
use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
Expand Down Expand Up @@ -1591,4 +1604,206 @@ mod tests {
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
}

#[test]
fn test_scan_row_with_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let test_file = File::open(&path).unwrap();

// total row count 7300
// 1. test selection len more than one page row count
let batch_size = 1000;
let expected_data = create_expect_batch(&test_file, batch_size);

let selections = create_test_selection(batch_size, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 0;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert!(num == batch_size || num == 300);
total_row_count += num;
}
assert_eq!(total_row_count, 4000);

let selections = create_test_selection(batch_size, 7300, true);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 1;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
//the lase batch will be 300
assert!(num == batch_size || num == 300);
total_row_count += num;
}
assert_eq!(total_row_count, 3300);

// 2. test selection len less than one page row count
let batch_size = 20;
let expected_data = create_expect_batch(&test_file, batch_size);
let selections = create_test_selection(batch_size, 7300, false);

let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 0;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert_eq!(num, batch_size);
total_row_count += num;
}
assert_eq!(total_row_count, 3660);

let selections = create_test_selection(batch_size, 7300, true);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 1;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert_eq!(num, batch_size);
total_row_count += num;
}
assert_eq!(total_row_count, 3640);

// 3. test selection_len less than batch_size
let batch_size = 20;
let selection_len = 5;
let expected_data_batch = create_expect_batch(&test_file, batch_size);
let expected_data_selection = create_expect_batch(&test_file, selection_len);
let selections = create_test_selection(selection_len, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);

let mut total_row_count = 0;

for batch in skip_reader {
let batch = batch.unwrap();
let num = batch.num_rows();
assert!(num == batch_size || num == selection_len);
if num == batch_size {
assert_eq!(
batch,
expected_data_batch
.get(total_row_count / batch_size)
.unwrap()
.clone()
);
total_row_count += batch_size;
} else if num == selection_len {
assert_eq!(
batch,
expected_data_selection
.get(total_row_count / selection_len)
.unwrap()
.clone()
);
total_row_count += selection_len;
}
// add skip offset
total_row_count += selection_len;
}

// 4. test selection_len more than batch_size
// If batch_size < selection_len will divide selection(50, read) ->
// selection(20, read), selection(20, read), selection(10, read)
let batch_size = 20;
let selection_len = 50;
let another_batch_size = 10;
let expected_data_batch = create_expect_batch(&test_file, batch_size);
let expected_data_batch2 = create_expect_batch(&test_file, another_batch_size);
let selections = create_test_selection(selection_len, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);

let mut total_row_count = 0;

for batch in skip_reader {
let batch = batch.unwrap();
let num = batch.num_rows();
assert!(num == batch_size || num == another_batch_size);
if num == batch_size {
assert_eq!(
batch,
expected_data_batch
.get(total_row_count / batch_size)
.unwrap()
.clone()
);
total_row_count += batch_size;
} else if num == another_batch_size {
assert_eq!(
batch,
expected_data_batch2
.get(total_row_count / another_batch_size)
.unwrap()
.clone()
);
total_row_count += 10;
// add skip offset
total_row_count += selection_len;
}
}

fn create_skip_reader(
test_file: &File,
batch_size: usize,
selections: Vec<RowSelection>,
) -> ParquetRecordBatchReader {
let arrow_reader_options =
ArrowReaderOptions::new().with_row_selection(selections);

let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(
test_file.try_clone().unwrap(),
arrow_reader_options,
)
.unwrap();
skip_arrow_reader.get_record_reader(batch_size).unwrap()
}

fn create_test_selection(
step_len: usize,
total_len: usize,
skip_first: bool,
) -> Vec<RowSelection> {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
while remaining != 0 {
let step = if remaining > step_len {
step_len
} else {
remaining
};
vec.push(RowSelection {
row_count: step,
skip,
});
remaining -= step;
skip = !skip;
}
vec
}

fn create_expect_batch(test_file: &File, batch_size: usize) -> Vec<RecordBatch> {
let mut serial_arrow_reader =
ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap();
let serial_reader =
serial_arrow_reader.get_record_reader(batch_size).unwrap();
let mut expected_data = vec![];
for batch in serial_reader {
expected_data.push(batch.unwrap());
}
expected_data
}
}
}

0 comments on commit d10d962

Please sign in to comment.