Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop LZ4 Support for LZ4 Codec #3013

Merged
merged 8 commits into from Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 70 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -2422,6 +2422,76 @@ mod tests {
assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
}

// This test is to ensure backward compatibility, it test 2 files containing the LZ4 CompressionCodec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the tests

// but different algorithms: LZ4_HADOOP and LZ4_RAW.
// 1. hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
// LZ4_HADOOP algorithm for compression.
// 2. non_hadoop_lz4_compressed.parquet -> It is a file with LZ4 CompressionCodec which uses
// LZ4_RAW algorithm for compression. This fallback is done to keep backward compatibility with
// older parquet-cpp versions.
//
// For more information, check: https://github.com/apache/arrow-rs/issues/2988
#[test]
fn test_read_lz4_hadoop_fallback() {
for file in [
"hadoop_lz4_compressed.parquet",
"non_hadoop_lz4_compressed.parquet",
] {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/{}", testdata, file);
let file = File::open(&path).unwrap();
let expected_rows = 4;

let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];

assert_eq!(batch.num_columns(), 3);
assert_eq!(batch.num_rows(), expected_rows);

let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
assert_eq!(
a.values(),
&[1593604800, 1593604800, 1593604801, 1593604801]
);

let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
let b: Vec<_> = b.iter().flatten().collect();
assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);

let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
}
}

#[test]
fn test_read_lz4_hadoop_large() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/hadoop_lz4_compressed_larger.parquet", testdata);
let file = File::open(&path).unwrap();
let expected_rows = 10000;

let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(batches.len(), 1);
let batch = &batches[0];

assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), expected_rows);

let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
let a: Vec<_> = a.iter().flatten().collect();
assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
}

#[test]
#[cfg(feature = "snap")]
fn test_read_nested_lists() {
Expand Down
32 changes: 25 additions & 7 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -24,7 +24,7 @@ use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
use crate::column::writer::encoder::{
ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues,
};
use crate::compression::{create_codec, Codec};
use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::LevelEncoder;
Expand Down Expand Up @@ -221,7 +221,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
page_writer: Box<dyn PageWriter + 'a>,
) -> Self {
let codec = props.compression(descr.path());
let compressor = create_codec(codec).unwrap();
let codec_options = CodecOptionsBuilder::default().build();
let compressor = create_codec(codec, &codec_options).unwrap();
let encoder = E::try_new(&descr, props.as_ref()).unwrap();

let statistics_enabled = props.statistics_enabled(descr.path());
Expand Down Expand Up @@ -1107,7 +1108,8 @@ mod tests {
};
use crate::file::writer::TrackedWrite;
use crate::file::{
properties::WriterProperties, reader::SerializedPageReader,
properties::{ReaderProperties, WriterProperties},
reader::SerializedPageReader,
writer::SerializedPageWriter,
};
use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType};
Expand Down Expand Up @@ -1674,11 +1676,15 @@ mod tests {
assert_eq!(stats.null_count(), 0);
assert!(stats.distinct_count().is_none());

let reader = SerializedPageReader::new(
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let reader = SerializedPageReader::new_with_properties(
Arc::new(Bytes::from(buf)),
&r.metadata,
r.rows_written as usize,
None,
Arc::new(props),
)
.unwrap();

Expand Down Expand Up @@ -1714,11 +1720,15 @@ mod tests {
let r = writer.close().unwrap();
assert!(r.metadata.statistics().is_none());

let reader = SerializedPageReader::new(
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let reader = SerializedPageReader::new_with_properties(
Arc::new(Bytes::from(buf)),
&r.metadata,
r.rows_written as usize,
None,
Arc::new(props),
)
.unwrap();

Expand Down Expand Up @@ -1842,12 +1852,16 @@ mod tests {
let r = writer.close().unwrap();

// Read pages and check the sequence
let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let mut page_reader = Box::new(
SerializedPageReader::new(
SerializedPageReader::new_with_properties(
Arc::new(file),
&r.metadata,
r.rows_written as usize,
None,
Arc::new(props),
)
.unwrap(),
);
Expand Down Expand Up @@ -2210,12 +2224,16 @@ mod tests {
assert_eq!(values_written, values.len());
let result = writer.close().unwrap();

let props = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let page_reader = Box::new(
SerializedPageReader::new(
SerializedPageReader::new_with_properties(
Arc::new(file),
&result.metadata,
result.rows_written as usize,
None,
Arc::new(props),
)
.unwrap(),
);
Expand Down