Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add page index reader test and support empty index. #2011

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 22 additions & 23 deletions arrow-flight/src/arrow.flight.protocol.rs
Expand Up @@ -229,7 +229,7 @@ pub mod flight_service_client {
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
T::ResponseBody: Default + Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
pub fn new(inner: T) -> Self {
Expand All @@ -242,7 +242,6 @@ pub mod flight_service_client {
) -> FlightServiceClient<InterceptedService<T, F>>
where
F: tonic::service::Interceptor,
T::ResponseBody: Default,
T: tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
Response = http::Response<
Expand Down Expand Up @@ -279,9 +278,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::HandshakeRequest>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::HandshakeResponse>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::HandshakeResponse>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -308,9 +307,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Criteria>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::FlightInfo>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::FlightInfo>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand Down Expand Up @@ -389,9 +388,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Ticket>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -418,9 +417,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::FlightData>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::PutResult>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::PutResult>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -446,9 +445,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoStreamingRequest<Message = super::FlightData>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::FlightData>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -475,9 +474,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Action>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::Result>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::Result>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand All @@ -501,9 +500,9 @@ pub mod flight_service_client {
&mut self,
request: impl tonic::IntoRequest<super::Empty>,
) -> Result<
tonic::Response<tonic::codec::Streaming<super::ActionType>>,
tonic::Status,
> {
tonic::Response<tonic::codec::Streaming<super::ActionType>>,
tonic::Status,
> {
self.inner
.ready()
.await
Expand Down
14 changes: 8 additions & 6 deletions parquet/src/file/metadata.rs
Expand Up @@ -55,8 +55,10 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
/// Page index for all pages in each column chunk
page_indexes: Option<Vec<Vec<Index>>>,
/// Offset index for all pages in each column chunk
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
}

impl ParquetMetaData {
Expand All @@ -74,8 +76,8 @@ impl ParquetMetaData {
pub fn new_with_page_index(
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
page_indexes: Option<Vec<Vec<Index>>>,
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
) -> Self {
ParquetMetaData {
file_metadata,
Expand Down Expand Up @@ -107,12 +109,12 @@ impl ParquetMetaData {
}

/// Returns page indexes in this file.
pub fn page_indexes(&self) -> Option<&Vec<Index>> {
pub fn page_indexes(&self) -> Option<&Vec<Vec<Index>>> {
self.page_indexes.as_ref()
}

/// Returns offset indexes in this file.
pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
pub fn offset_indexes(&self) -> Option<&Vec<Vec<Vec<PageLocation>>>> {
self.offset_indexes.as_ref()
}
}
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/page_index/index.rs
Expand Up @@ -56,6 +56,7 @@ pub enum Index {
DOUBLE(NativeIndex<f64>),
BYTE_ARRAY(ByteArrayIndex),
FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
EMPTY_ARRAY(),
}

/// An index of a column of [`Type`] physical representation
Expand Down
11 changes: 4 additions & 7 deletions parquet/src/file/page_index/index_reader.rs
Expand Up @@ -101,13 +101,7 @@ fn get_index_offset_and_lengths(
.iter()
.map(|x| x.column_index_length())
.map(|maybe_length| {
let index_length = maybe_length.ok_or_else(|| {
ParquetError::General(
"The column_index_length must exist if offset_index_offset exists"
.to_string(),
)
})?;

let index_length = maybe_length.unwrap_or(0);
Ok(index_length.try_into().unwrap())
})
.collect::<Result<Vec<_>, ParquetError>>()?;
Expand Down Expand Up @@ -143,6 +137,9 @@ fn deserialize_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
if data.is_empty() {
return Ok(Index::EMPTY_ARRAY());
}
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);

Expand Down