Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration test for scan rows with selection #2158

Merged
merged 9 commits into from Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
Expand Down Expand Up @@ -120,6 +120,7 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -25,7 +25,7 @@ use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowNativeType, DataType as ArrowType};

use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::buffer::{
dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer,
};
Expand Down Expand Up @@ -181,6 +181,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
8 changes: 7 additions & 1 deletion parquet/src/arrow/array_reader/complex_object_array.rs
Expand Up @@ -166,7 +166,13 @@ where
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
match self.column_reader.as_mut() {
Some(reader) => reader.skip_records(num_records),
None => Ok(0),
None => {
if self.next_column_reader()? {
self.column_reader.as_mut().unwrap().skip_records(num_records)
}else {
Ok(0)
}
}
}
}

Expand Down
27 changes: 27 additions & 0 deletions parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -144,3 +144,30 @@ where
}
Ok(records_read)
}

/// Uses `pages` to set up to `record_reader` 's `column_reader`
///
/// If we skip records before all read operation,
/// need set `column_reader` by `set_page_reader`
/// for constructing `def_level_decoder` and `rep_level_decoder`.
fn set_column_reader<V, CV>(
record_reader: &mut GenericRecordReader<V, CV>,
pages: &mut dyn PageIterator,
) -> Result<bool>
where
V: ValuesBuffer + Default,
CV: ColumnValueDecoder<Slice = V::Slice>,
{
return if record_reader.column_reader().is_none() {
// If we skip records before all read operation
// we need set `column_reader` by `set_page_reader`
if let Some(page_reader) = pages.next() {
record_reader.set_page_reader(page_reader?)?;
Ok(true)
} else {
Ok(false)
}
} else {
Ok(true)
};
}
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/null_array.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, ArrayReader, set_column_reader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::column::page::PageIterator;
Expand Down Expand Up @@ -97,6 +97,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
3 changes: 2 additions & 1 deletion parquet/src/arrow/array_reader/primitive_array.rs
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::arrow::array_reader::{read_records, ArrayReader};
use crate::arrow::array_reader::{read_records, set_column_reader, ArrayReader};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
Expand Down Expand Up @@ -222,6 +222,7 @@ where
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
set_column_reader(&mut self.record_reader, self.pages.as_mut())?;
self.record_reader.skip_records(num_records)
}

Expand Down
223 changes: 219 additions & 4 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -33,6 +33,7 @@ use crate::arrow::ProjectionMask;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::file::serialized_reader::ReadOptionsBuilder;
use crate::schema::types::SchemaDescriptor;

/// Arrow reader api.
Expand Down Expand Up @@ -217,7 +218,15 @@ impl ParquetFileArrowReader {
chunk_reader: R,
options: ArrowReaderOptions,
) -> Result<Self> {
let file_reader = Arc::new(SerializedFileReader::new(chunk_reader)?);
let file_reader = if options.selection.is_some() {
let options = ReadOptionsBuilder::new().with_page_index().build();
Arc::new(SerializedFileReader::new_with_options(
chunk_reader,
options,
)?)
} else {
Arc::new(SerializedFileReader::new(chunk_reader)?)
};
Ok(Self::new_with_options(file_reader, options))
}

Expand Down Expand Up @@ -298,12 +307,15 @@ impl Iterator for ParquetRecordBatchReader {
continue;
}

// try to read record
let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) => {
selection.push_front(RowSelection::skip(remaining));
Some(remaining) if remaining != 0 => {
// if page row count less than batch_size we must set batch size to page row count.
// add check avoid dead loop
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix wrong logic, remaining record need read

selection.push_front(RowSelection::select(remaining));
self.batch_size
}
None => front.row_count,
_ => front.row_count,
};

break to_read;
Expand Down Expand Up @@ -390,6 +402,7 @@ mod tests {

use crate::arrow::arrow_reader::{
ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
ParquetRecordBatchReader, RowSelection,
};
use crate::arrow::buffer::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
Expand Down Expand Up @@ -1586,4 +1599,206 @@ mod tests {
test_row_group_batch(MIN_BATCH_SIZE, MIN_BATCH_SIZE - 1);
test_row_group_batch(MIN_BATCH_SIZE - 1, MIN_BATCH_SIZE);
}

#[test]
fn test_scan_row_with_selection() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let test_file = File::open(&path).unwrap();

// total row count 7300
// 1. test selection len more than one page row count
let batch_size = 1000;
let expected_data = create_expect_batch(&test_file, batch_size);

let selections = create_test_selection(batch_size, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 0;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert!(num == batch_size || num == 300);
total_row_count += num;
}
assert_eq!(total_row_count, 4000);

let selections = create_test_selection(batch_size, 7300, true);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 1;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
//the lase batch will be 300
assert!(num == batch_size || num == 300);
total_row_count += num;
}
assert_eq!(total_row_count, 3300);

// 2. test selection len less than one page row count
let batch_size = 20;
let expected_data = create_expect_batch(&test_file, batch_size);
let selections = create_test_selection(batch_size, 7300, false);

let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 0;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert_eq!(num, batch_size);
total_row_count += num;
}
assert_eq!(total_row_count, 3660);

let selections = create_test_selection(batch_size, 7300, true);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);
let mut total_row_count = 0;
let mut index = 1;
for batch in skip_reader {
let batch = batch.unwrap();
assert_eq!(batch, expected_data.get(index).unwrap().clone());
index += 2;
let num = batch.num_rows();
assert_eq!(num, batch_size);
total_row_count += num;
}
assert_eq!(total_row_count, 3640);

// 3. test selection_len less than batch_size
let batch_size = 20;
let selection_len = 5;
let expected_data_batch = create_expect_batch(&test_file, batch_size);
let expected_data_selection = create_expect_batch(&test_file, selection_len);
let selections = create_test_selection(selection_len, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);

let mut total_row_count = 0;

for batch in skip_reader {
let batch = batch.unwrap();
let num = batch.num_rows();
assert!(num == batch_size || num == selection_len);
if num == batch_size {
assert_eq!(
batch,
expected_data_batch
.get(total_row_count / batch_size)
.unwrap()
.clone()
);
total_row_count += batch_size;
} else if num == selection_len {
assert_eq!(
batch,
expected_data_selection
.get(total_row_count / selection_len)
.unwrap()
.clone()
);
total_row_count += selection_len;
}
// add skip offset
total_row_count += selection_len;
}

// 4. test selection_len more than batch_size
// If batch_size < selection_len will divide selection(50, read) ->
// selection(20, read), selection(20, read), selection(10, read)
let batch_size = 20;
let selection_len = 50;
let another_batch_size = 10;
let expected_data_batch = create_expect_batch(&test_file, batch_size);
let expected_data_batch2 = create_expect_batch(&test_file, another_batch_size);
let selections = create_test_selection(selection_len, 7300, false);
let skip_reader = create_skip_reader(&test_file, batch_size, selections);

let mut total_row_count = 0;

for batch in skip_reader {
let batch = batch.unwrap();
let num = batch.num_rows();
assert!(num == batch_size || num == another_batch_size);
if num == batch_size {
assert_eq!(
batch,
expected_data_batch
.get(total_row_count / batch_size)
.unwrap()
.clone()
);
total_row_count += batch_size;
} else if num == another_batch_size {
assert_eq!(
batch,
expected_data_batch2
.get(total_row_count / another_batch_size)
.unwrap()
.clone()
);
total_row_count += 10;
// add skip offset
total_row_count += selection_len;
}
}

fn create_skip_reader(
test_file: &File,
batch_size: usize,
selections: Vec<RowSelection>,
) -> ParquetRecordBatchReader {
let arrow_reader_options =
ArrowReaderOptions::new().with_row_selection(selections);

let mut skip_arrow_reader = ParquetFileArrowReader::try_new_with_options(
test_file.try_clone().unwrap(),
arrow_reader_options,
)
.unwrap();
skip_arrow_reader.get_record_reader(batch_size).unwrap()
}

fn create_test_selection(
step_len: usize,
total_len: usize,
skip_first: bool,
) -> Vec<RowSelection> {
let mut remaining = total_len;
let mut skip = skip_first;
let mut vec = vec![];
while remaining != 0 {
let step = if remaining > step_len {
step_len
} else {
remaining
};
vec.push(RowSelection {
row_count: step,
skip,
});
remaining -= step;
skip = !skip;
}
vec
}

fn create_expect_batch(test_file: &File, batch_size: usize) -> Vec<RecordBatch> {
let mut serial_arrow_reader =
ParquetFileArrowReader::try_new(test_file.try_clone().unwrap()).unwrap();
let serial_reader =
serial_arrow_reader.get_record_reader(batch_size).unwrap();
let mut expected_data = vec![];
for batch in serial_reader {
expected_data.push(batch.unwrap());
}
expected_data
}
}
}