diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f54e6797ed2..7d0f4bfed9a 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -55,6 +55,7 @@ mod builder; mod byte_array; mod byte_array_dictionary; mod dictionary_buffer; +mod empty_array; mod offset_buffer; #[cfg(test)] @@ -97,6 +98,9 @@ pub trait RowGroupCollection { /// Get schema of parquet file. fn schema(&self) -> Result; + /// Get the numer of rows in this collection + fn num_rows(&self) -> usize; + /// Returns an iterator over the column chunks for particular column fn column_chunks(&self, i: usize) -> Result>; } @@ -106,6 +110,10 @@ impl RowGroupCollection for Arc { Ok(self.metadata().file_metadata().schema_descr_ptr()) } + fn num_rows(&self) -> usize { + self.metadata().file_metadata().num_rows() as usize + } + fn column_chunks(&self, column_index: usize) -> Result> { let iterator = FilePageIterator::new(column_index, Arc::clone(self))?; Ok(Box::new(iterator)) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index ba3ba355827..2836c699c39 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType as ArrowType, Field, IntervalUnit, Schema, SchemaRef}; +use crate::arrow::array_reader::empty_array::make_empty_array_reader; use crate::arrow::array_reader::{ make_byte_array_dictionary_reader, make_byte_array_reader, ArrayReader, ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, @@ -37,7 +38,7 @@ use crate::data_type::{ Int96Type, }; use crate::errors::ParquetError::ArrowError; -use crate::errors::{ParquetError, Result}; +use crate::errors::{Result}; use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr}; use crate::schema::visitor::TypeVisitor; @@ -64,10 +65,6 @@ where filtered_root_names.insert(root.name().to_string()); } - if leaves.is_empty() { - return Err(general_err!("Can't build array reader without columns!")); - } - // Only pass root fields that take part in the projection // to avoid traversal of columns that are not read. // TODO: also prune unread parts of the tree in child structures @@ -412,10 +409,10 @@ impl<'a> ArrayReaderBuilder { fn build_array_reader(&mut self) -> Result> { let context = ArrayReaderBuilderContext::default(); - self.visit_struct(self.root_schema.clone(), &context) - .and_then(|reader_opt| { - reader_opt.ok_or_else(|| general_err!("Failed to build array reader!")) - }) + match self.visit_struct(self.root_schema.clone(), &context)? { + Some(reader) => Ok(reader), + None => Ok(make_empty_array_reader(self.row_groups.num_rows())), + } } // Utility functions diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs new file mode 100644 index 00000000000..54b77becba0 --- /dev/null +++ b/parquet/src/arrow/array_reader/empty_array.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::Result; +use arrow::array::{ArrayDataBuilder, ArrayRef, StructArray}; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::sync::Arc; + +/// Returns an [`ArrayReader`] that yields [`StructArray`] with no columns +/// but with row counts that correspond to the amount of data in the file +/// +/// This is useful for when projection eliminates all columns within a collection +pub fn make_empty_array_reader(row_count: usize) -> Box { + Box::new(EmptyArrayReader::new(row_count)) +} + +struct EmptyArrayReader { + data_type: ArrowType, + remaining_rows: usize, +} + +impl EmptyArrayReader { + pub fn new(row_count: usize) -> Self { + Self { + data_type: ArrowType::Struct(vec![]), + remaining_rows: row_count, + } + } +} + +impl ArrayReader for EmptyArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + let len = self.remaining_rows.min(batch_size); + self.remaining_rows -= len; + + let data = ArrayDataBuilder::new(self.data_type.clone()) + .len(len) + .build() + .unwrap(); + + Ok(Arc::new(StructArray::from(data))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + None + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + None + } +} diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 80323e59fa5..7675707e357 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -19,17 +19,18 @@ use std::sync::Arc; +use arrow::array::Array; use arrow::datatypes::{DataType as ArrowType, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; use arrow::record_batch::{RecordBatch, RecordBatchReader}; use arrow::{array::StructArray, error::ArrowError}; -use crate::arrow::array_reader::{build_array_reader, ArrayReader, StructArrayReader}; +use crate::arrow::array_reader::{build_array_reader, ArrayReader}; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::schema::{ parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns, }; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::file::metadata::{KeyValue, ParquetMetaData}; use crate::file::reader::FileReader; @@ -234,20 +235,10 @@ impl Iterator for ParquetRecordBatchReader { "Struct array reader should return struct array".to_string(), ) }); + match struct_array { Err(err) => Some(Err(err)), - Ok(e) => { - match RecordBatch::try_new(self.schema.clone(), e.columns_ref()) { - Err(err) => Some(Err(err)), - Ok(record_batch) => { - if record_batch.num_rows() > 0 { - Some(Ok(record_batch)) - } else { - None - } - } - } - } + Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))), } } } @@ -265,12 +256,6 @@ impl ParquetRecordBatchReader { batch_size: usize, array_reader: Box, ) -> Result { - // Check that array reader is struct array reader - array_reader - .as_any() - .downcast_ref::() - .ok_or_else(|| general_err!("The input must be struct array reader!"))?; - let schema = match array_reader.get_data_type() { ArrowType::Struct(ref fields) => Schema::new(fields.clone()), _ => unreachable!("Struct array reader's data type is not struct!"), @@ -1386,4 +1371,26 @@ mod tests { schema_without_metadata.as_ref() ); } + + #[test] + fn test_empty_projection() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_plain.parquet", testdata); + let file = File::open(&path).unwrap(); + let reader = SerializedFileReader::try_from(file).unwrap(); + let expected_rows = reader.metadata().file_metadata().num_rows() as usize; + + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); + let batch_reader = arrow_reader.get_record_reader_by_columns([], 2).unwrap(); + + let mut total_rows = 0; + for maybe_batch in batch_reader { + let batch = maybe_batch.unwrap(); + total_rows += batch.num_rows(); + assert_eq!(batch.num_columns(), 0); + assert!(batch.num_rows() <= 2); + } + + assert_eq!(total_rows, expected_rows); + } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index b8fafec1e7c..7bf3ebfa92a 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -345,6 +345,7 @@ impl Stream input, InMemoryRowGroup { schema: metadata.file_metadata().schema_descr_ptr(), + row_count: row_group_metadata.num_rows() as usize, column_chunks, }, )) @@ -419,6 +420,7 @@ async fn read_footer( struct InMemoryRowGroup { schema: SchemaDescPtr, column_chunks: Vec>, + row_count: usize, } impl RowGroupCollection for InMemoryRowGroup { @@ -426,6 +428,10 @@ impl RowGroupCollection for InMemoryRowGroup { Ok(self.schema.clone()) } + fn num_rows(&self) -> usize { + self.row_count + } + fn column_chunks(&self, i: usize) -> Result> { let page_reader = self.column_chunks[i].as_ref().unwrap().pages();