Skip to content

Commit

Permalink
Add public API for decoding parquet footer (#1804)
Browse files Browse the repository at this point in the history
* Add public API for decoding parquet footer

* Review feedback
  • Loading branch information
tustvold committed Jun 7, 2022
1 parent 96a4d6c commit a439f7f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 75 deletions.
32 changes: 11 additions & 21 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -77,13 +77,12 @@

use std::collections::VecDeque;
use std::fmt::Formatter;
use std::io::{Cursor, SeekFrom};
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use byteorder::{ByteOrder, LittleEndian};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
Expand All @@ -99,10 +98,10 @@ use crate::arrow::ProjectionMask;
use crate::basic::Compression;
use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::parse_metadata_buffer;
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::SerializedPageReader;
use crate::file::PARQUET_MAGIC;
use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};

/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read parquet files
Expand Down Expand Up @@ -134,30 +133,21 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
}

fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-8)).await?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

let mut buf = [0_u8; 8];
let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;

if buf[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

let metadata_len = LittleEndian::read_i32(&buf[..4]) as i64;
if metadata_len < 0 {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
));
}

self.seek(SeekFrom::End(-8 - metadata_len)).await?;
let metadata_len = decode_footer(&buf)?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;

let mut buf = Vec::with_capacity(metadata_len as usize + 8);
let mut buf = Vec::with_capacity(metadata_len);
self.read_to_end(&mut buf).await?;

Ok(Arc::new(parse_metadata_buffer(&mut Cursor::new(buf))?))
Ok(Arc::new(decode_metadata(&buf)?))
}
.boxed()
}
Expand Down
97 changes: 48 additions & 49 deletions parquet/src/file/footer.rs
Expand Up @@ -15,11 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{
cmp::min,
io::{Cursor, Read, Seek, SeekFrom},
sync::Arc,
};
use std::{io::Read, sync::Arc};

use byteorder::{ByteOrder, LittleEndian};
use parquet_format::{ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData};
Expand All @@ -28,10 +24,7 @@ use thrift::protocol::TCompactInputProtocol;
use crate::basic::ColumnOrder;

use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, reader::ChunkReader, DEFAULT_FOOTER_READ_SIZE, FOOTER_SIZE,
PARQUET_MAGIC,
};
use crate::file::{metadata::*, reader::ChunkReader, FOOTER_SIZE, PARQUET_MAGIC};

use crate::schema::types::{self, SchemaDescriptor};

Expand All @@ -52,55 +45,42 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
));
}

// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = min(DEFAULT_FOOTER_READ_SIZE, chunk_reader.len() as usize);
let mut default_end_reader = chunk_reader
.get_read(chunk_reader.len() - default_end_len as u64, default_end_len)?;
let mut default_len_end_buf = vec![0; default_end_len];
default_end_reader.read_exact(&mut default_len_end_buf)?;
let mut footer = [0_u8; 8];
chunk_reader
.get_read(file_size - 8, 8)?
.read_exact(&mut footer)?;

// check this is indeed a parquet file
if default_len_end_buf[default_end_len - 4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}
let metadata_len = decode_footer(&footer)?;
let footer_metadata_len = FOOTER_SIZE + metadata_len;

// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(
&default_len_end_buf[default_end_len - 8..default_end_len - 4],
) as i64;
if metadata_len < 0 {
if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
"Invalid Parquet file. Reported metadata length of {} + {} byte footer, but file is only {} bytes",
metadata_len,
FOOTER_SIZE,
file_size
));
}
let footer_metadata_len = FOOTER_SIZE + metadata_len as usize;

// build up the reader covering the entire metadata
let mut default_end_cursor = Cursor::new(default_len_end_buf);
if footer_metadata_len > file_size as usize {
return Err(general_err!(
"Invalid Parquet file. Metadata start is less than zero ({})",
file_size as i64 - footer_metadata_len as i64
let mut metadata = Vec::with_capacity(metadata_len);

let read = chunk_reader
.get_read(file_size - footer_metadata_len as u64, metadata_len)?
.read_to_end(&mut metadata)?;

if read != metadata_len {
return Err(eof_err!(
"Expected to read {} bytes of metadata, got {}",
metadata_len,
read
));
} else if footer_metadata_len < DEFAULT_FOOTER_READ_SIZE {
// the whole metadata is in the bytes we already read
default_end_cursor.seek(SeekFrom::End(-(footer_metadata_len as i64)))?;
parse_metadata_buffer(&mut default_end_cursor)
} else {
// the end of file read by default is not long enough, read missing bytes
let complementary_end_read = chunk_reader.get_read(
file_size - footer_metadata_len as u64,
FOOTER_SIZE + metadata_len as usize - default_end_len,
)?;
parse_metadata_buffer(&mut complementary_end_read.chain(default_end_cursor))
}

decode_metadata(&metadata)
}

/// Reads [`ParquetMetaData`] from the provided [`Read`] starting at the readers current position
pub(crate) fn parse_metadata_buffer<T: Read + ?Sized>(
metadata_read: &mut T,
) -> Result<ParquetMetaData> {
/// Decodes [`ParquetMetaData`] from the provided bytes
pub fn decode_metadata(metadata_read: &[u8]) -> Result<ParquetMetaData> {
// TODO: row group filtering
let mut prot = TCompactInputProtocol::new(metadata_read);
let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
Expand All @@ -124,6 +104,23 @@ pub(crate) fn parse_metadata_buffer<T: Read + ?Sized>(
Ok(ParquetMetaData::new(file_metadata, row_groups))
}

/// Decodes the footer returning the metadata length in bytes
pub fn decode_footer(slice: &[u8; FOOTER_SIZE]) -> Result<usize> {
// check this is indeed a parquet file
if slice[4..] != PARQUET_MAGIC {
return Err(general_err!("Invalid Parquet file. Corrupt footer"));
}

// get the metadata length from the footer
let metadata_len = LittleEndian::read_i32(&slice[..4]);
metadata_len.try_into().map_err(|_| {
general_err!(
"Invalid Parquet file. Metadata length is less than zero ({})",
metadata_len
)
})
}

/// Parses column orders from Thrift definition.
/// If no column orders are defined, returns `None`.
fn parse_column_orders(
Expand Down Expand Up @@ -209,7 +206,9 @@ mod tests {
assert!(reader_result.is_err());
assert_eq!(
reader_result.err().unwrap(),
general_err!("Invalid Parquet file. Metadata start is less than zero (-255)")
general_err!(
"Invalid Parquet file. Reported metadata length of 255 + 8 byte footer, but file is only 8 bytes"
)
);
}

Expand Down
8 changes: 3 additions & 5 deletions parquet/src/file/mod.rs
Expand Up @@ -105,8 +105,6 @@ pub mod serialized_reader;
pub mod statistics;
pub mod writer;

const FOOTER_SIZE: usize = 8;
pub(crate) const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

/// The number of bytes read at the end of the parquet file on first read
const DEFAULT_FOOTER_READ_SIZE: usize = 64 * 1024;
/// The length of the parquet footer in bytes
pub const FOOTER_SIZE: usize = 8;
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];

0 comments on commit a439f7f

Please sign in to comment.