From e51eca30fe052569de23a446981368ab444bf001 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Mon, 13 Jun 2022 11:11:16 +0800 Subject: [PATCH 01/11] support compression for IPC --- arrow/Cargo.toml | 3 + arrow/src/ipc/compression/compression.rs | 117 ++++++++ arrow/src/ipc/compression/mod.rs | 21 ++ arrow/src/ipc/mod.rs | 1 + arrow/src/ipc/reader.rs | 150 ++++++++++- arrow/src/ipc/writer.rs | 325 ++++++++++++++++++++++- 6 files changed, 595 insertions(+), 22 deletions(-) create mode 100644 arrow/src/ipc/compression/compression.rs create mode 100644 arrow/src/ipc/compression/mod.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index a9996b8b475..8878d4a607f 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,6 +38,9 @@ path = "src/lib.rs" bench = false [dependencies] +byteorder = "1" +lz4 = "1.23" +zstd = "0.11.1" serde = { version = "1.0" } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } diff --git a/arrow/src/ipc/compression/compression.rs b/arrow/src/ipc/compression/compression.rs new file mode 100644 index 00000000000..e229aae5d00 --- /dev/null +++ b/arrow/src/ipc/compression/compression.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Result; +use crate::ipc::CompressionType; +use std::io::{Read, Write}; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum CompressionCodecType { + NoCompression, + Lz4Frame, + ZSTD, +} + +impl From for CompressionCodecType { + fn from(compression_type: CompressionType) -> Self { + match compression_type { + CompressionType::ZSTD => CompressionCodecType::ZSTD, + CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, + _ => CompressionCodecType::NoCompression, + } + } +} + +impl Into for CompressionCodecType { + fn into(self) -> CompressionType { + match self { + CompressionCodecType::NoCompression => CompressionType(-1), + CompressionCodecType::Lz4Frame => CompressionType::LZ4_FRAME, + CompressionCodecType::ZSTD => CompressionType::ZSTD, + } + } +} + +impl CompressionCodecType { + pub fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { + match self { + CompressionCodecType::Lz4Frame => { + let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); + encoder.write_all(input).unwrap(); + encoder.finish().1.unwrap(); + Ok(()) + } + CompressionCodecType::ZSTD => { + let mut encoder = zstd::Encoder::new(output, 0).unwrap(); + encoder.write_all(input).unwrap(); + encoder.finish().unwrap(); + // Err(ArrowError::NotYetImplemented("Compression don't support the ZSTD".to_string())) + Ok(()) + } + _ => Ok(()), + } + } + + pub fn decompress(&self, input: &[u8], output: &mut Vec) -> Result { + let result: Result = match self { + CompressionCodecType::Lz4Frame => { + let mut decoder = lz4::Decoder::new(input)?; + let size = decoder.read_to_end(output).unwrap(); + Ok(size) + } + CompressionCodecType::ZSTD => { + let mut decoder = zstd::Decoder::new(input)?; + let size = decoder.read_to_end(output).unwrap(); + Ok(size) + // Err(ArrowError::NotYetImplemented("Compression don't support the ZSTD".to_string())) + } + _ => Ok(input.len()), + }; + result + } +} + +#[cfg(test)] +mod tests { + use crate::ipc::compression::compression::CompressionCodecType; + + #[test] + fn test_lz4_compression() { + let input_bytes = "hello lz4".as_bytes(); + let codec: CompressionCodecType = CompressionCodecType::Lz4Frame; + let mut output_bytes: Vec = Vec::new(); + codec.compress(input_bytes, &mut output_bytes).unwrap(); + let mut result_output_bytes: Vec = Vec::new(); + codec + .decompress(output_bytes.as_slice(), &mut result_output_bytes) + .unwrap(); + assert_eq!(input_bytes, result_output_bytes.as_slice()); + } + + #[test] + fn test_zstd_compression() { + let input_bytes = "hello zstd".as_bytes(); + let codec: CompressionCodecType = CompressionCodecType::ZSTD; + let mut output_bytes: Vec = Vec::new(); + codec.compress(input_bytes, &mut output_bytes).unwrap(); + let mut result_output_bytes: Vec = Vec::new(); + codec + .decompress(output_bytes.as_slice(), &mut result_output_bytes) + .unwrap(); + assert_eq!(input_bytes, result_output_bytes.as_slice()); + } +} diff --git a/arrow/src/ipc/compression/mod.rs b/arrow/src/ipc/compression/mod.rs new file mode 100644 index 00000000000..852a9ee2ce7 --- /dev/null +++ b/arrow/src/ipc/compression/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod compression; +pub(crate) const LENGTH_EMPTY_COMPRESSED_DATA: i64 = 0; +pub(crate) const LENGTH_NO_COMPRESSED_DATA: i64 = -1; +pub(crate) const LENGTH_OF_PREFIX_DATA: i64 = 8; diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/mod.rs index d5455b454e7..9b3f2149008 100644 --- a/arrow/src/ipc/mod.rs +++ b/arrow/src/ipc/mod.rs @@ -22,6 +22,7 @@ pub mod convert; pub mod reader; pub mod writer; +mod compression; #[allow(clippy::redundant_closure)] #[allow(clippy::needless_lifetimes)] #[allow(clippy::extra_unused_lifetimes)] diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 41c0c3293ac..7a0f2cce05c 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -20,6 +20,7 @@ //! The `FileReader` and `StreamReader` have similar interfaces, //! however the `FileReader` expects a reader that supports `Seek`ing +use byteorder::{ByteOrder, LittleEndian}; use std::collections::HashMap; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -32,15 +33,73 @@ use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchReader}; +use crate::ipc::compression::compression::CompressionCodecType; +use crate::ipc::compression::{ + LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA, +}; +use crate::ipc::CompressionType; use ipc::CONTINUATION_MARKER; use DataType::*; /// Read a buffer based on offset and length -fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { +/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58 +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. +fn read_buffer( + buf: &ipc::Buffer, + a_data: &[u8], + compression_codec: &CompressionCodecType, +) -> Buffer { let start_offset = buf.offset() as usize; let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; - Buffer::from(&buf_data) + // corner case: empty buffer + if buf_data.is_empty() { + return Buffer::from(buf_data); + } + match compression_codec { + CompressionCodecType::NoCompression => Buffer::from(buf_data), + CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => { + // 8byte + data + // read the first 8 bytes + // if the data is compressed, decompress the data, otherwise return as is + let decompressed_length = read_uncompressed_size(buf_data); + if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA { + // emtpy + let empty = Vec::::new(); + Buffer::from(empty) + } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA { + // not compress + let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; + Buffer::from(data) + } else { + // decompress data using the codec + let mut uncompressed_buffer = Vec::new(); + let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; + // TODO consider the error result + compression_codec + .decompress(input_data, &mut uncompressed_buffer) + .unwrap(); + Buffer::from(uncompressed_buffer) + } + } + } +} + +/// Get the uncompressed length +/// Notes: +/// -1: indicate that the data that follows is not compressed +/// 0: indicate that there is no data +/// positive number: indicate the uncompressed length for the following data +fn read_uncompressed_size(buffer: &[u8]) -> i64 { + let len_buffer = &buffer[0..8]; + // 64-bit little-endian signed integer + LittleEndian::read_i64(len_buffer) } /// Coordinates reading arrays based on data types. @@ -60,6 +119,7 @@ fn create_array( dictionaries_by_id: &HashMap, mut node_index: usize, mut buffer_index: usize, + compression_codec: &CompressionCodecType, ) -> Result<(ArrayRef, usize, usize)> { use DataType::*; let data_type = field.data_type(); @@ -70,7 +130,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 3] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(), ); node_index += 1; @@ -83,7 +143,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(), ); node_index += 1; @@ -94,7 +154,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(); node_index += 1; buffer_index += 2; @@ -106,6 +166,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, )?; node_index = triple.1; buffer_index = triple.2; @@ -116,7 +177,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..=buffer_index] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(); node_index += 1; buffer_index += 1; @@ -128,6 +189,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, )?; node_index = triple.1; buffer_index = triple.2; @@ -136,7 +198,8 @@ fn create_array( } Struct(struct_fields) => { let struct_node = &nodes[node_index]; - let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + let null_buffer: Buffer = + read_buffer(&buffers[buffer_index], data, compression_codec); node_index += 1; buffer_index += 1; @@ -153,6 +216,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, )?; node_index = triple.1; buffer_index = triple.2; @@ -172,7 +236,7 @@ fn create_array( let index_node = &nodes[node_index]; let index_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(); let dict_id = field.dict_id().ok_or_else(|| { @@ -202,13 +266,15 @@ fn create_array( let len = union_node.length() as usize; let type_ids: Buffer = - read_buffer(&buffers[buffer_index], data)[..len].into(); + read_buffer(&buffers[buffer_index], data, compression_codec)[..len] + .into(); buffer_index += 1; let value_offsets = match mode { UnionMode::Dense => { - let buffer = read_buffer(&buffers[buffer_index], data); + let buffer = + read_buffer(&buffers[buffer_index], data, compression_codec); buffer_index += 1; Some(buffer[..len * 4].into()) } @@ -226,6 +292,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec, )?; node_index = triple.1; @@ -264,7 +331,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(), ); node_index += 1; @@ -589,6 +656,17 @@ pub fn read_record_batch( let field_nodes = batch.nodes().ok_or_else(|| { ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; + // TODO check the compression body logical + let option_compression = batch.compression(); + let compression_codec = match option_compression { + None => CompressionCodecType::NoCompression, + Some(compression) => match compression.codec() { + CompressionType::ZSTD => CompressionCodecType::ZSTD, + CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, + _ => CompressionCodecType::NoCompression, + }, + }; + // keep track of buffer and node index, the functions that create arrays mutate these let mut buffer_index = 0; let mut node_index = 0; @@ -607,6 +685,7 @@ pub fn read_record_batch( dictionaries_by_id, node_index, buffer_index, + &compression_codec, )?; node_index = triple.1; buffer_index = triple.2; @@ -640,6 +719,7 @@ pub fn read_record_batch( dictionaries_by_id, node_index, buffer_index, + &compression_codec, )?; node_index = triple.1; buffer_index = triple.2; @@ -888,7 +968,6 @@ impl FileReader { let mut block_data = vec![0; meta_len as usize]; self.reader.read_exact(&mut block_data)?; - let message = ipc::root_as_message(&block_data[..]).map_err(|err| { ArrowError::IoError(format!("Unable to get root as footer: {:?}", err)) })?; @@ -1378,6 +1457,53 @@ mod tests { }); } + #[test] + fn read_generated_streams_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); + } + + #[test] + fn read_generated_files_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + fn create_test_projection_schema() -> Schema { // define field types let list_data_type = diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index ffeeadc9d99..1c7eb1e2d99 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -20,6 +20,7 @@ //! The `FileWriter` and `StreamWriter` have similar interfaces, //! however the `FileWriter` expects a reader that supports `Seek`ing +use byteorder::{LittleEndian, WriteBytesExt}; use std::collections::HashMap; use std::io::{BufWriter, Write}; @@ -36,6 +37,11 @@ use crate::ipc; use crate::record_batch::RecordBatch; use crate::util::bit_util; +use crate::ipc::compression::compression::CompressionCodecType; +use crate::ipc::compression::{ + LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA, +}; +use crate::ipc::CompressionType; use ipc::CONTINUATION_MARKER; /// IPC write options used to control the behaviour of the writer @@ -55,9 +61,50 @@ pub struct IpcWriteOptions { /// version 2.0.0: V4, with legacy format enabled /// version 4.0.0: V5 metadata_version: ipc::MetadataVersion, + batch_compression_type: CompressionCodecType, } impl IpcWriteOptions { + pub fn try_new_with_compression( + alignment: usize, + write_legacy_ipc_format: bool, + metadata_version: ipc::MetadataVersion, + batch_compression_type: CompressionCodecType, + ) -> Result { + if alignment == 0 || alignment % 8 != 0 { + return Err(ArrowError::InvalidArgumentError( + "Alignment should be greater than 0 and be a multiple of 8".to_string(), + )); + } + match batch_compression_type { + CompressionCodecType::NoCompression => {} + _ => { + if metadata_version != ipc::MetadataVersion::V5 { + return Err(ArrowError::InvalidArgumentError( + "Compress buffer just support from metadata v5".to_string(), + )); + } + } + }; + match metadata_version { + ipc::MetadataVersion::V5 => { + if write_legacy_ipc_format { + Err(ArrowError::InvalidArgumentError( + "Legacy IPC format only supported on metadata version 4" + .to_string(), + )) + } else { + Ok(Self { + alignment, + write_legacy_ipc_format, + metadata_version, + batch_compression_type, + }) + } + } + z => panic!("Unsupported ipc::MetadataVersion {:?}", z), + } + } /// Try create IpcWriteOptions, checking for incompatible settings pub fn try_new( alignment: usize, @@ -79,6 +126,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + batch_compression_type: CompressionCodecType::NoCompression, }), ipc::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -91,6 +139,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + batch_compression_type: CompressionCodecType::NoCompression, }) } } @@ -105,6 +154,7 @@ impl Default for IpcWriteOptions { alignment: 8, write_legacy_ipc_format: false, metadata_version: ipc::MetadataVersion::V5, + batch_compression_type: CompressionCodecType::NoCompression, } } } @@ -326,6 +376,20 @@ impl IpcDataGenerator { let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let mut offset = 0; + + // get the type of compression + let compression_codec = write_options.batch_compression_type; + let compression_type: CompressionType = compression_codec.into(); + let compression = { + if compression_codec != CompressionCodecType::NoCompression { + let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); + c.add_method(ipc::BodyCompressionMethod::BUFFER); + c.add_codec(compression_type); + Some(c.finish()) + } else { + None + } + }; for array in batch.columns() { let array_data = array.data(); offset = write_array_data( @@ -336,18 +400,25 @@ impl IpcDataGenerator { offset, array.len(), array.null_count(), + &compression_codec, ); } + // pad the tail of body data + let len = arrow_data.len(); + let pad_len = pad_to_8(len as u32); + arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); - let root = { let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -381,6 +452,19 @@ impl IpcDataGenerator { let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; + // get the type of compression + let compression_codec = write_options.batch_compression_type; + let compression_type: CompressionType = compression_codec.into(); + let compression = { + if compression_codec != CompressionCodecType::NoCompression { + let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); + c.add_method(ipc::BodyCompressionMethod::BUFFER); + c.add_codec(compression_type); + Some(c.finish()) + } else { + None + } + }; write_array_data( array_data, &mut buffers, @@ -389,8 +473,14 @@ impl IpcDataGenerator { 0, array_data.len(), array_data.null_count(), + &compression_codec, ); + // pad the tail of body data + let len = arrow_data.len(); + let pad_len = pad_to_8(len as u32); + arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); + // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); @@ -400,6 +490,9 @@ impl IpcDataGenerator { batch_builder.add_length(array_data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + if let Some(c) = compression { + batch_builder.add_compression(c); + } batch_builder.finish() }; @@ -515,9 +608,12 @@ impl FileWriter { let data_gen = IpcDataGenerator::default(); let mut writer = BufWriter::new(writer); // write magic to header + let mut header_size: usize = 0; writer.write_all(&super::ARROW_MAGIC[..])?; + header_size += super::ARROW_MAGIC.len(); // create an 8-byte boundary after the header writer.write_all(&[0, 0])?; + header_size += 2; // write the schema, set the written bytes to the schema + header let encoded_message = data_gen.schema_to_bytes(schema, &write_options); let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; @@ -525,7 +621,7 @@ impl FileWriter { writer, write_options, schema: schema.clone(), - block_offsets: meta + data + 8, + block_offsets: meta + data + header_size, dictionary_blocks: vec![], record_blocks: vec![], finished: false, @@ -858,6 +954,7 @@ fn write_array_data( offset: i64, num_rows: usize, null_count: usize, + compression_codec: &CompressionCodecType, ) -> i64 { let mut offset = offset; if !matches!(array_data.data_type(), DataType::Null) { @@ -885,11 +982,12 @@ fn write_array_data( Some(buffer) => buffer.clone(), }; - offset = write_buffer(&null_buffer, buffers, arrow_data, offset); + offset = + write_buffer(&null_buffer, buffers, arrow_data, offset, compression_codec); } array_data.buffers().iter().for_each(|buffer| { - offset = write_buffer(buffer, buffers, arrow_data, offset); + offset = write_buffer(buffer, buffers, arrow_data, offset, compression_codec); }); if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) { @@ -904,6 +1002,7 @@ fn write_array_data( offset, data_ref.len(), data_ref.null_count(), + &compression_codec, ); }); } @@ -912,19 +1011,67 @@ fn write_array_data( } /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector +/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58 +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. fn write_buffer( buffer: &Buffer, buffers: &mut Vec, arrow_data: &mut Vec, offset: i64, + compression_codec: &CompressionCodecType, ) -> i64 { - let len = buffer.len(); - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; + let origin_buffer_len = buffer.len(); + let mut compression_buffer = Vec::::new(); + let (data, uncompression_buffer_len) = match compression_codec { + CompressionCodecType::NoCompression => { + // this buffer_len will not used in the following logic + // If we don't use the compression, just write the data in the array + (buffer.as_slice(), origin_buffer_len as i64) + } + CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => { + if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { + (buffer.as_slice(), 0) + } else { + compression_codec + .compress(buffer.as_slice(), &mut compression_buffer) + .unwrap(); + if compression_buffer.len() > origin_buffer_len { + // the length of compressed data is larger than uncompressed data + // use the uncompressed data with -1 + // -1 indicate that we don't compress the data + (buffer.as_slice(), LENGTH_NO_COMPRESSED_DATA) + } else { + // use the compressed data with uncompressed length + (compression_buffer.as_slice(), origin_buffer_len as i64) + } + } + } + }; + let len = data.len() as i64; + // TODO: don't need to pad each buffer, and just need to pad the tail of the message body + // let pad_len = pad_to_8(len as u32); + // let total_len: i64 = (len + pad_len) as i64; // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); - buffers.push(ipc::Buffer::new(offset, total_len)); - arrow_data.extend_from_slice(buffer.as_slice()); - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); + let total_len = if compression_codec == &CompressionCodecType::NoCompression { + buffers.push(ipc::Buffer::new(offset, len)); + len + } else { + buffers.push(ipc::Buffer::new(offset, len + LENGTH_OF_PREFIX_DATA)); + // write the prefix of the uncompressed length + arrow_data + .write_i64::(uncompression_buffer_len) + .unwrap(); + len + LENGTH_OF_PREFIX_DATA + }; + arrow_data.extend_from_slice(data); + + // arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); offset + total_len } @@ -950,6 +1097,164 @@ mod tests { use crate::ipc::reader::*; use crate::util::integration_util::*; + #[test] + fn test_write_with_empty_record_batch() { + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + let values: Vec> = vec![]; + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) + .unwrap(); + { + let file = + File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap(); + let write_option = IpcWriteOptions::try_new_with_compression( + 8, + false, + ipc::MetadataVersion::V5, + CompressionCodecType::Lz4Frame, + ) + .unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + { + // read file + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4")) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + loop { + match reader.next() { + Some(Ok(read_batch)) => { + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + Some(Err(e)) => { + panic!("{}", e); + } + None => { + break; + } + } + } + } + } + #[test] + fn test_write_file_with_lz4_compression() { + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + let values: Vec> = vec![Some(12), Some(1)]; + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) + .unwrap(); + { + let file = + File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap(); + let write_option = IpcWriteOptions::try_new_with_compression( + 8, + false, + ipc::MetadataVersion::V5, + CompressionCodecType::Lz4Frame, + ) + .unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + { + // read file + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4")) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + loop { + match reader.next() { + Some(Ok(read_batch)) => { + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + Some(Err(e)) => { + panic!("{}", e); + } + None => { + break; + } + } + } + } + } + + #[test] + fn test_write_file_with_zstd_compression() { + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + let values: Vec> = vec![Some(12), Some(1)]; + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]) + .unwrap(); + { + let file = + File::create("target/debug/testdata/arrow_zstd.arrow_file").unwrap(); + let write_option = IpcWriteOptions::try_new_with_compression( + 8, + false, + ipc::MetadataVersion::V5, + CompressionCodecType::ZSTD, + ) + .unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + { + // read file + let file = + File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_zstd")) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + loop { + match reader.next() { + Some(Ok(read_batch)) => { + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a.data_type(), b.data_type()); + assert_eq!(a.len(), b.len()); + assert_eq!(a.null_count(), b.null_count()); + }); + } + Some(Err(e)) => { + panic!("{}", e); + } + None => { + break; + } + } + } + } + } + #[test] fn test_write_file() { let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]); From 67e7de5d5c64eb26f185379e634ca29f15aef50f Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 00:36:07 +0800 Subject: [PATCH 02/11] fix conflict --- arrow/Cargo.toml | 2 +- arrow/src/ipc/reader.rs | 2 +- arrow/src/ipc/writer.rs | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 6579c002380..d06682170c8 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -63,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false } bitflags = { version = "1.2.1", default-features = false } [features] -default = ["csv", "ipc", "test_utils"] +default = ["csv", "ipc", "test_utils", "lz4", "zstd"] csv = ["csv_crate"] ipc = ["flatbuffers"] simd = ["packed_simd"] diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 2ce29024ba1..d1f1f438f11 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -273,7 +273,7 @@ fn create_array( // In V4, union types has validity bitmap // In V5 and later, union types have no validity bitmap if metadata < &ipc::MetadataVersion::V5 { - read_buffer(&buffers[buffer_index], data); + read_buffer(&buffers[buffer_index], data, compression_codec); buffer_index += 1; } diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index dabbd70abc4..22ce05e0671 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -20,7 +20,7 @@ //! The `FileWriter` and `StreamWriter` have similar interfaces, //! however the `FileWriter` expects a reader that supports `Seek`ing -use byteorder::{LittleEndian, WriteBytesExt}; +use byteorder::{ByteOrder, LittleEndian}; use std::collections::HashMap; use std::io::{BufWriter, Write}; @@ -1074,9 +1074,9 @@ fn write_buffer( } else { buffers.push(ipc::Buffer::new(offset, len + LENGTH_OF_PREFIX_DATA)); // write the prefix of the uncompressed length - arrow_data - .write_i64::(uncompression_buffer_len) - .unwrap(); + let mut uncompression_len_buf = [0;8]; + LittleEndian::write_i64(&mut uncompression_len_buf, uncompression_buffer_len); + arrow_data.extend_from_slice(&uncompression_len_buf); len + LENGTH_OF_PREFIX_DATA }; arrow_data.extend_from_slice(data); From 58488a3fa24650dbd4089b6df34029eb0aa9202c Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 01:03:52 +0800 Subject: [PATCH 03/11] edit toml --- arrow/Cargo.toml | 6 +++--- arrow/src/ipc/compression/compression.rs | 2 -- arrow/src/ipc/writer.rs | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index d06682170c8..d5828f37f5f 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -39,8 +39,8 @@ bench = false [dependencies] byteorder = { version = "1", default-features = false } -lz4 = { version = "1.23", default-features = false, optional = true } -zstd = { version = "0.11.1", optional = true, default-features = false } +lz4 = { version = "1.23", default-features = true} +zstd = { version = "0.11.1", default-features = true } serde = { version = "1.0", default-features = false } serde_derive = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["preserve_order"] } @@ -63,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false } bitflags = { version = "1.2.1", default-features = false } [features] -default = ["csv", "ipc", "test_utils", "lz4", "zstd"] +default = ["csv", "ipc", "test_utils"] csv = ["csv_crate"] ipc = ["flatbuffers"] simd = ["packed_simd"] diff --git a/arrow/src/ipc/compression/compression.rs b/arrow/src/ipc/compression/compression.rs index e229aae5d00..935d5fcb26c 100644 --- a/arrow/src/ipc/compression/compression.rs +++ b/arrow/src/ipc/compression/compression.rs @@ -59,7 +59,6 @@ impl CompressionCodecType { let mut encoder = zstd::Encoder::new(output, 0).unwrap(); encoder.write_all(input).unwrap(); encoder.finish().unwrap(); - // Err(ArrowError::NotYetImplemented("Compression don't support the ZSTD".to_string())) Ok(()) } _ => Ok(()), @@ -77,7 +76,6 @@ impl CompressionCodecType { let mut decoder = zstd::Decoder::new(input)?; let size = decoder.read_to_end(output).unwrap(); Ok(size) - // Err(ArrowError::NotYetImplemented("Compression don't support the ZSTD".to_string())) } _ => Ok(input.len()), }; diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 22ce05e0671..c59ff00ef77 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -1074,7 +1074,7 @@ fn write_buffer( } else { buffers.push(ipc::Buffer::new(offset, len + LENGTH_OF_PREFIX_DATA)); // write the prefix of the uncompressed length - let mut uncompression_len_buf = [0;8]; + let mut uncompression_len_buf = [0; 8]; LittleEndian::write_i64(&mut uncompression_len_buf, uncompression_buffer_len); arrow_data.extend_from_slice(&uncompression_len_buf); len + LENGTH_OF_PREFIX_DATA From 932e381b0c1ca70c57c7658bba6c6205b4d986f1 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 09:52:35 +0800 Subject: [PATCH 04/11] fix clippy and format --- .../{compression.rs => ipc_compression.rs} | 24 +++++++++---------- arrow/src/ipc/compression/mod.rs | 2 +- arrow/src/ipc/reader.rs | 6 ++--- arrow/src/ipc/writer.rs | 20 ++++++++-------- 4 files changed, 26 insertions(+), 26 deletions(-) rename arrow/src/ipc/compression/{compression.rs => ipc_compression.rs} (86%) diff --git a/arrow/src/ipc/compression/compression.rs b/arrow/src/ipc/compression/ipc_compression.rs similarity index 86% rename from arrow/src/ipc/compression/compression.rs rename to arrow/src/ipc/compression/ipc_compression.rs index 935d5fcb26c..0067a93c85e 100644 --- a/arrow/src/ipc/compression/compression.rs +++ b/arrow/src/ipc/compression/ipc_compression.rs @@ -23,25 +23,25 @@ use std::io::{Read, Write}; pub enum CompressionCodecType { NoCompression, Lz4Frame, - ZSTD, + Zstd, } impl From for CompressionCodecType { fn from(compression_type: CompressionType) -> Self { match compression_type { - CompressionType::ZSTD => CompressionCodecType::ZSTD, + CompressionType::ZSTD => CompressionCodecType::Zstd, CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, _ => CompressionCodecType::NoCompression, } } } -impl Into for CompressionCodecType { - fn into(self) -> CompressionType { - match self { - CompressionCodecType::NoCompression => CompressionType(-1), - CompressionCodecType::Lz4Frame => CompressionType::LZ4_FRAME, - CompressionCodecType::ZSTD => CompressionType::ZSTD, +impl From for Option { + fn from(codec: CompressionCodecType) -> Self { + match codec { + CompressionCodecType::NoCompression => None, + CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME), + CompressionCodecType::Zstd => Some(CompressionType::ZSTD), } } } @@ -55,7 +55,7 @@ impl CompressionCodecType { encoder.finish().1.unwrap(); Ok(()) } - CompressionCodecType::ZSTD => { + CompressionCodecType::Zstd => { let mut encoder = zstd::Encoder::new(output, 0).unwrap(); encoder.write_all(input).unwrap(); encoder.finish().unwrap(); @@ -72,7 +72,7 @@ impl CompressionCodecType { let size = decoder.read_to_end(output).unwrap(); Ok(size) } - CompressionCodecType::ZSTD => { + CompressionCodecType::Zstd => { let mut decoder = zstd::Decoder::new(input)?; let size = decoder.read_to_end(output).unwrap(); Ok(size) @@ -85,7 +85,7 @@ impl CompressionCodecType { #[cfg(test)] mod tests { - use crate::ipc::compression::compression::CompressionCodecType; + use crate::ipc::compression::ipc_compression::CompressionCodecType; #[test] fn test_lz4_compression() { @@ -103,7 +103,7 @@ mod tests { #[test] fn test_zstd_compression() { let input_bytes = "hello zstd".as_bytes(); - let codec: CompressionCodecType = CompressionCodecType::ZSTD; + let codec: CompressionCodecType = CompressionCodecType::Zstd; let mut output_bytes: Vec = Vec::new(); codec.compress(input_bytes, &mut output_bytes).unwrap(); let mut result_output_bytes: Vec = Vec::new(); diff --git a/arrow/src/ipc/compression/mod.rs b/arrow/src/ipc/compression/mod.rs index 852a9ee2ce7..1cdac812c80 100644 --- a/arrow/src/ipc/compression/mod.rs +++ b/arrow/src/ipc/compression/mod.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod compression; +pub(crate) mod ipc_compression; pub(crate) const LENGTH_EMPTY_COMPRESSED_DATA: i64 = 0; pub(crate) const LENGTH_NO_COMPRESSED_DATA: i64 = -1; pub(crate) const LENGTH_OF_PREFIX_DATA: i64 = 8; diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index d1f1f438f11..a57c0211d07 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -33,7 +33,7 @@ use crate::error::{ArrowError, Result}; use crate::ipc; use crate::record_batch::{RecordBatch, RecordBatchOptions, RecordBatchReader}; -use crate::ipc::compression::compression::CompressionCodecType; +use crate::ipc::compression::ipc_compression::CompressionCodecType; use crate::ipc::compression::{ LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA, }; @@ -64,7 +64,7 @@ fn read_buffer( } match compression_codec { CompressionCodecType::NoCompression => Buffer::from(buf_data), - CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => { + CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { // 8byte + data // read the first 8 bytes // if the data is compressed, decompress the data, otherwise return as is @@ -675,7 +675,7 @@ pub fn read_record_batch( let compression_codec = match option_compression { None => CompressionCodecType::NoCompression, Some(compression) => match compression.codec() { - CompressionType::ZSTD => CompressionCodecType::ZSTD, + CompressionType::ZSTD => CompressionCodecType::Zstd, CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, _ => CompressionCodecType::NoCompression, }, diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index c59ff00ef77..81a17d8ae9d 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -37,7 +37,7 @@ use crate::ipc; use crate::record_batch::RecordBatch; use crate::util::bit_util; -use crate::ipc::compression::compression::CompressionCodecType; +use crate::ipc::compression::ipc_compression::CompressionCodecType; use crate::ipc::compression::{ LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA, }; @@ -379,12 +379,12 @@ impl IpcDataGenerator { // get the type of compression let compression_codec = write_options.batch_compression_type; - let compression_type: CompressionType = compression_codec.into(); + let compression_type: Option = compression_codec.into(); let compression = { - if compression_codec != CompressionCodecType::NoCompression { + if let Some(codec) = compression_type { let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); c.add_method(ipc::BodyCompressionMethod::BUFFER); - c.add_codec(compression_type); + c.add_codec(codec); Some(c.finish()) } else { None @@ -455,12 +455,12 @@ impl IpcDataGenerator { // get the type of compression let compression_codec = write_options.batch_compression_type; - let compression_type: CompressionType = compression_codec.into(); + let compression_type: Option = compression_codec.into(); let compression = { - if compression_codec != CompressionCodecType::NoCompression { + if let Some(codec) = compression_type { let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); c.add_method(ipc::BodyCompressionMethod::BUFFER); - c.add_codec(compression_type); + c.add_codec(codec); Some(c.finish()) } else { None @@ -1011,7 +1011,7 @@ fn write_array_data( offset, data_ref.len(), data_ref.null_count(), - &compression_codec, + compression_codec, write_options, ); }); @@ -1044,7 +1044,7 @@ fn write_buffer( // If we don't use the compression, just write the data in the array (buffer.as_slice(), origin_buffer_len as i64) } - CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => { + CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { (buffer.as_slice(), 0) } else { @@ -1227,7 +1227,7 @@ mod tests { 8, false, ipc::MetadataVersion::V5, - CompressionCodecType::ZSTD, + CompressionCodecType::Zstd, ) .unwrap(); let mut writer = From fcc4f5f1824776eba4add3a190312dc7a44376a9 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 09:56:10 +0800 Subject: [PATCH 05/11] format doc --- arrow/src/ipc/reader.rs | 2 +- arrow/src/ipc/writer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index a57c0211d07..6c41e9099e3 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -42,7 +42,7 @@ use ipc::CONTINUATION_MARKER; use DataType::*; /// Read a buffer based on offset and length -/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58 +/// From /// Each constituent buffer is first compressed with the indicated /// compressor, and then written with the uncompressed length in the first 8 /// bytes as a 64-bit little-endian signed integer followed by the compressed diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 81a17d8ae9d..131205dfc19 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -1021,7 +1021,7 @@ fn write_array_data( } /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector -/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58 +/// From /// Each constituent buffer is first compressed with the indicated /// compressor, and then written with the uncompressed length in the first 8 /// bytes as a 64-bit little-endian signed integer followed by the compressed From 5b0d711387ff5cab20630f2dec81e54c1ecf54c9 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 10:03:21 +0800 Subject: [PATCH 06/11] format code --- arrow/src/ipc/reader.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 6c41e9099e3..d9ebf55c2e7 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -670,7 +670,6 @@ pub fn read_record_batch( let field_nodes = batch.nodes().ok_or_else(|| { ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; - // TODO check the compression body logical let option_compression = batch.compression(); let compression_codec = match option_compression { None => CompressionCodecType::NoCompression, From 0c610672a3e04de09fa6733f1d7d8d3114e43f10 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 16:23:47 +0800 Subject: [PATCH 07/11] add padding for tail of each buffer --- arrow/src/ipc/writer.rs | 116 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 9 deletions(-) diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 131205dfc19..d93c9ffd989 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -1064,25 +1064,22 @@ fn write_buffer( } }; let len = data.len() as i64; - // TODO: don't need to pad each buffer, and just need to pad the tail of the message body - // let pad_len = pad_to_8(len as u32); - // let total_len: i64 = (len + pad_len) as i64; - // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); let total_len = if compression_codec == &CompressionCodecType::NoCompression { buffers.push(ipc::Buffer::new(offset, len)); len } else { - buffers.push(ipc::Buffer::new(offset, len + LENGTH_OF_PREFIX_DATA)); + buffers.push(ipc::Buffer::new(offset, LENGTH_OF_PREFIX_DATA + len)); // write the prefix of the uncompressed length let mut uncompression_len_buf = [0; 8]; LittleEndian::write_i64(&mut uncompression_len_buf, uncompression_buffer_len); arrow_data.extend_from_slice(&uncompression_len_buf); - len + LENGTH_OF_PREFIX_DATA + LENGTH_OF_PREFIX_DATA + len }; arrow_data.extend_from_slice(data); - - // arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); - offset + total_len + // padding and make offset 8 bytes aligned + let pad_len = pad_to_8(len as u32) as i64; + arrow_data.extend_from_slice(&vec![0u8; pad_len as usize][..]); + offset + total_len + pad_len } /// Calculate an 8-byte boundary and return the number of bytes needed to pad to 8 bytes @@ -1621,6 +1618,107 @@ mod tests { }); } + #[test] + fn read_and_rewrite_compression_files_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read and rewrite the file to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + // write IPC version 5 + let options = IpcWriteOptions::try_new_with_compression( + 8, + false, + ipc::MetadataVersion::V5, + CompressionCodecType::Lz4Frame, + ) + .unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + let mut reader = FileReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_compression_stream_200() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec!["generated_lz4", "generated_zstd"]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file, None).unwrap(); + + // read and rewrite the stream to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); + let options = IpcWriteOptions::try_new_with_compression( + 8, + false, + ipc::MetadataVersion::V5, + CompressionCodecType::Zstd, + ) + .unwrap(); + let mut writer = + StreamWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); + let mut reader = StreamReader::try_new(file, None).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + /// Read gzipped JSON file fn read_gzip_json(version: &str, path: &str) -> ArrowJson { let testdata = crate::util::test_util::arrow_test_data(); From 9ac4b011b1eb3ae0360e19c3c90286874d9a052a Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 22:21:29 +0800 Subject: [PATCH 08/11] try fix the arrow lz4 and zstd --- arrow/Cargo.toml | 6 +++--- arrow/src/ipc/compression/ipc_compression.rs | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index d5828f37f5f..62a98850d40 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -39,8 +39,8 @@ bench = false [dependencies] byteorder = { version = "1", default-features = false } -lz4 = { version = "1.23", default-features = true} -zstd = { version = "0.11.1", default-features = true } +lz4 = { version = "1.23", default-features = false, optional = true } +zstd = { version = "0.11.1", optional = true, default-features = false } serde = { version = "1.0", default-features = false } serde_derive = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["preserve_order"] } @@ -63,7 +63,7 @@ multiversion = { version = "0.6.1", default-features = false } bitflags = { version = "1.2.1", default-features = false } [features] -default = ["csv", "ipc", "test_utils"] +default = ["csv", "ipc", "test_utils", "zstd", "lz4"] csv = ["csv_crate"] ipc = ["flatbuffers"] simd = ["packed_simd"] diff --git a/arrow/src/ipc/compression/ipc_compression.rs b/arrow/src/ipc/compression/ipc_compression.rs index 0067a93c85e..219f5435c34 100644 --- a/arrow/src/ipc/compression/ipc_compression.rs +++ b/arrow/src/ipc/compression/ipc_compression.rs @@ -49,12 +49,14 @@ impl From for Option { impl CompressionCodecType { pub fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { match self { + #[cfg(any(feature = "lz4", test))] CompressionCodecType::Lz4Frame => { let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); encoder.write_all(input).unwrap(); encoder.finish().1.unwrap(); Ok(()) } + #[cfg(any(feature = "zstd", test))] CompressionCodecType::Zstd => { let mut encoder = zstd::Encoder::new(output, 0).unwrap(); encoder.write_all(input).unwrap(); @@ -67,15 +69,21 @@ impl CompressionCodecType { pub fn decompress(&self, input: &[u8], output: &mut Vec) -> Result { let result: Result = match self { + #[cfg(any(feature = "lz4", test))] CompressionCodecType::Lz4Frame => { let mut decoder = lz4::Decoder::new(input)?; - let size = decoder.read_to_end(output).unwrap(); - Ok(size) + match decoder.read_to_end(output) { + Ok(size) => Ok(size), + Err(e) => Err(e.into()), + } } + #[cfg(any(feature = "zstd", test))] CompressionCodecType::Zstd => { let mut decoder = zstd::Decoder::new(input)?; - let size = decoder.read_to_end(output).unwrap(); - Ok(size) + match decoder.read_to_end(output) { + Ok(size) => Ok(size), + Err(e) => Err(e.into()), + } } _ => Ok(input.len()), }; From 49edfc8ef9af66175a07cacac69b13646a50e161 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Sun, 19 Jun 2022 23:28:12 +0800 Subject: [PATCH 09/11] add lz4,zstd for compression cfg --- arrow/src/ipc/compression/ipc_compression.rs | 79 ++++++++++---------- arrow/src/ipc/reader.rs | 9 ++- arrow/src/ipc/writer.rs | 10 ++- 3 files changed, 51 insertions(+), 47 deletions(-) diff --git a/arrow/src/ipc/compression/ipc_compression.rs b/arrow/src/ipc/compression/ipc_compression.rs index 219f5435c34..19f7e196483 100644 --- a/arrow/src/ipc/compression/ipc_compression.rs +++ b/arrow/src/ipc/compression/ipc_compression.rs @@ -15,9 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::error::Result; use crate::ipc::CompressionType; -use std::io::{Read, Write}; #[derive(Debug, Clone, Copy, PartialEq)] pub enum CompressionCodecType { @@ -46,48 +44,51 @@ impl From for Option { } } -impl CompressionCodecType { - pub fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { - match self { - #[cfg(any(feature = "lz4", test))] - CompressionCodecType::Lz4Frame => { - let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); - encoder.write_all(input).unwrap(); - encoder.finish().1.unwrap(); - Ok(()) - } - #[cfg(any(feature = "zstd", test))] - CompressionCodecType::Zstd => { - let mut encoder = zstd::Encoder::new(output, 0).unwrap(); - encoder.write_all(input).unwrap(); - encoder.finish().unwrap(); - Ok(()) +#[cfg(any(feature = "zstd,lz4", test))] +mod compression_function { + use crate::error::Result; + use crate::ipc::compression::ipc_compression::CompressionCodecType; + use std::io::{Read, Write}; + + impl CompressionCodecType { + pub fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { + match self { + CompressionCodecType::Lz4Frame => { + let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); + encoder.write_all(input).unwrap(); + encoder.finish().1.unwrap(); + Ok(()) + } + CompressionCodecType::Zstd => { + let mut encoder = zstd::Encoder::new(output, 0).unwrap(); + encoder.write_all(input).unwrap(); + encoder.finish().unwrap(); + Ok(()) + } + _ => Ok(()), } - _ => Ok(()), } - } - pub fn decompress(&self, input: &[u8], output: &mut Vec) -> Result { - let result: Result = match self { - #[cfg(any(feature = "lz4", test))] - CompressionCodecType::Lz4Frame => { - let mut decoder = lz4::Decoder::new(input)?; - match decoder.read_to_end(output) { - Ok(size) => Ok(size), - Err(e) => Err(e.into()), + pub fn decompress(&self, input: &[u8], output: &mut Vec) -> Result { + let result: Result = match self { + CompressionCodecType::Lz4Frame => { + let mut decoder = lz4::Decoder::new(input)?; + match decoder.read_to_end(output) { + Ok(size) => Ok(size), + Err(e) => Err(e.into()), + } } - } - #[cfg(any(feature = "zstd", test))] - CompressionCodecType::Zstd => { - let mut decoder = zstd::Decoder::new(input)?; - match decoder.read_to_end(output) { - Ok(size) => Ok(size), - Err(e) => Err(e.into()), + CompressionCodecType::Zstd => { + let mut decoder = zstd::Decoder::new(input)?; + match decoder.read_to_end(output) { + Ok(size) => Ok(size), + Err(e) => Err(e.into()), + } } - } - _ => Ok(input.len()), - }; - result + _ => Ok(input.len()), + }; + result + } } } diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index d9ebf55c2e7..9774428a7b7 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -79,13 +79,14 @@ fn read_buffer( Buffer::from(data) } else { // decompress data using the codec - let mut uncompressed_buffer = Vec::new(); - let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; + let mut _uncompressed_buffer = Vec::new(); + let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; // TODO consider the error result + #[cfg(any(feature = "zstd,lz4", test))] compression_codec - .decompress(input_data, &mut uncompressed_buffer) + .decompress(_input_data, &mut _uncompressed_buffer) .unwrap(); - Buffer::from(uncompressed_buffer) + Buffer::from(_uncompressed_buffer) } } } diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index d93c9ffd989..4d52df66ead 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -1037,7 +1037,7 @@ fn write_buffer( compression_codec: &CompressionCodecType, ) -> i64 { let origin_buffer_len = buffer.len(); - let mut compression_buffer = Vec::::new(); + let mut _compression_buffer = Vec::::new(); let (data, uncompression_buffer_len) = match compression_codec { CompressionCodecType::NoCompression => { // this buffer_len will not used in the following logic @@ -1048,17 +1048,19 @@ fn write_buffer( if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { (buffer.as_slice(), 0) } else { + #[cfg(any(feature = "zstd,lz4", test))] compression_codec - .compress(buffer.as_slice(), &mut compression_buffer) + .compress(buffer.as_slice(), &mut _compression_buffer) .unwrap(); - if compression_buffer.len() > origin_buffer_len { + let compression_len = _compression_buffer.len(); + if compression_len > origin_buffer_len { // the length of compressed data is larger than uncompressed data // use the uncompressed data with -1 // -1 indicate that we don't compress the data (buffer.as_slice(), LENGTH_NO_COMPRESSED_DATA) } else { // use the compressed data with uncompressed length - (compression_buffer.as_slice(), origin_buffer_len as i64) + (_compression_buffer.as_slice(), origin_buffer_len as i64) } } } From 5d974b3fd00bf2017385bc3c0e7bf5f4ddfec8b6 Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Fri, 29 Jul 2022 16:26:35 +0800 Subject: [PATCH 10/11] add cfg for ipm_compression --- arrow/Cargo.toml | 8 +++++--- arrow/src/ipc/compression/ipc_compression.rs | 2 +- arrow/src/ipc/mod.rs | 1 + arrow/src/ipc/reader.rs | 15 +++++++++------ arrow/src/ipc/writer.rs | 11 ++++++----- 5 files changed, 22 insertions(+), 15 deletions(-) diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 62a98850d40..7ee21f3c8ac 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,9 +38,8 @@ path = "src/lib.rs" bench = false [dependencies] -byteorder = { version = "1", default-features = false } lz4 = { version = "1.23", default-features = false, optional = true } -zstd = { version = "0.11.1", optional = true, default-features = false } +zstd = { version = "0.11.1", default-features = false, optional = true } serde = { version = "1.0", default-features = false } serde_derive = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["preserve_order"] } @@ -63,7 +62,8 @@ multiversion = { version = "0.6.1", default-features = false } bitflags = { version = "1.2.1", default-features = false } [features] -default = ["csv", "ipc", "test_utils", "zstd", "lz4"] +default = ["csv", "ipc", "test_utils"] +ipc_compression = ["zstd", "lz4"] csv = ["csv_crate"] ipc = ["flatbuffers"] simd = ["packed_simd"] @@ -84,6 +84,8 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng criterion = { version = "0.3", default-features = false } flate2 = { version = "1", default-features = false, features = ["rust_backend"] } tempfile = { version = "3", default-features = false } +lz4 = { version = "1.23", default-features = false } +zstd = { version = "0.11", default-features = false } [build-dependencies] diff --git a/arrow/src/ipc/compression/ipc_compression.rs b/arrow/src/ipc/compression/ipc_compression.rs index 19f7e196483..8959d6da9b6 100644 --- a/arrow/src/ipc/compression/ipc_compression.rs +++ b/arrow/src/ipc/compression/ipc_compression.rs @@ -44,7 +44,7 @@ impl From for Option { } } -#[cfg(any(feature = "zstd,lz4", test))] +#[cfg(any(feature = "ipc_compression", test))] mod compression_function { use crate::error::Result; use crate::ipc::compression::ipc_compression::CompressionCodecType; diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/mod.rs index 9b3f2149008..2b30e72206c 100644 --- a/arrow/src/ipc/mod.rs +++ b/arrow/src/ipc/mod.rs @@ -23,6 +23,7 @@ pub mod reader; pub mod writer; mod compression; + #[allow(clippy::redundant_closure)] #[allow(clippy::needless_lifetimes)] #[allow(clippy::extra_unused_lifetimes)] diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 9774428a7b7..37bf193791f 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -20,7 +20,6 @@ //! The `FileReader` and `StreamReader` have similar interfaces, //! however the `FileReader` expects a reader that supports `Seek`ing -use byteorder::{ByteOrder, LittleEndian}; use std::collections::HashMap; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -63,8 +62,9 @@ fn read_buffer( return Buffer::from(buf_data); } match compression_codec { - CompressionCodecType::NoCompression => Buffer::from(buf_data), - CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { + CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd + if cfg!(feature = "ipc_compression") || cfg!(test) => + { // 8byte + data // read the first 8 bytes // if the data is compressed, decompress the data, otherwise return as is @@ -81,14 +81,17 @@ fn read_buffer( // decompress data using the codec let mut _uncompressed_buffer = Vec::new(); let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; - // TODO consider the error result - #[cfg(any(feature = "zstd,lz4", test))] + #[cfg(any(feature = "ipc_compression", test))] compression_codec .decompress(_input_data, &mut _uncompressed_buffer) .unwrap(); Buffer::from(_uncompressed_buffer) } } + CompressionCodecType::NoCompression => Buffer::from(buf_data), + _ => { + panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable"); + } } } @@ -100,7 +103,7 @@ fn read_buffer( fn read_uncompressed_size(buffer: &[u8]) -> i64 { let len_buffer = &buffer[0..8]; // 64-bit little-endian signed integer - LittleEndian::read_i64(len_buffer) + i64::from_le_bytes(len_buffer.try_into().unwrap()) } /// Coordinates reading arrays based on data types. diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index 4d52df66ead..568535c611c 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -20,7 +20,6 @@ //! The `FileWriter` and `StreamWriter` have similar interfaces, //! however the `FileWriter` expects a reader that supports `Seek`ing -use byteorder::{ByteOrder, LittleEndian}; use std::collections::HashMap; use std::io::{BufWriter, Write}; @@ -65,6 +64,7 @@ pub struct IpcWriteOptions { } impl IpcWriteOptions { + #[cfg(any(feature = "ipc_compression", test))] pub fn try_new_with_compression( alignment: usize, write_legacy_ipc_format: bool, @@ -1047,8 +1047,8 @@ fn write_buffer( CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { (buffer.as_slice(), 0) - } else { - #[cfg(any(feature = "zstd,lz4", test))] + } else if cfg!(feature = "ipc_compression") || cfg!(test) { + #[cfg(any(feature = "ipc_compression", test))] compression_codec .compress(buffer.as_slice(), &mut _compression_buffer) .unwrap(); @@ -1062,6 +1062,8 @@ fn write_buffer( // use the compressed data with uncompressed length (_compression_buffer.as_slice(), origin_buffer_len as i64) } + } else { + panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable"); } } }; @@ -1072,8 +1074,7 @@ fn write_buffer( } else { buffers.push(ipc::Buffer::new(offset, LENGTH_OF_PREFIX_DATA + len)); // write the prefix of the uncompressed length - let mut uncompression_len_buf = [0; 8]; - LittleEndian::write_i64(&mut uncompression_len_buf, uncompression_buffer_len); + let uncompression_len_buf: [u8; 8] = uncompression_buffer_len.to_le_bytes(); arrow_data.extend_from_slice(&uncompression_len_buf); LENGTH_OF_PREFIX_DATA + len }; From 7f90fb23e3143c6cd241648d5e01e3ccb69d0f5a Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Mon, 1 Aug 2022 16:23:59 +0800 Subject: [PATCH 11/11] address comments --- arrow/src/ipc/compression/ipc_compression.rs | 34 +++++---- arrow/src/ipc/reader.rs | 29 +++---- arrow/src/ipc/writer.rs | 80 +++++++++++--------- 3 files changed, 77 insertions(+), 66 deletions(-) diff --git a/arrow/src/ipc/compression/ipc_compression.rs b/arrow/src/ipc/compression/ipc_compression.rs index 8959d6da9b6..3b7305c74b8 100644 --- a/arrow/src/ipc/compression/ipc_compression.rs +++ b/arrow/src/ipc/compression/ipc_compression.rs @@ -19,7 +19,6 @@ use crate::ipc::CompressionType; #[derive(Debug, Clone, Copy, PartialEq)] pub enum CompressionCodecType { - NoCompression, Lz4Frame, Zstd, } @@ -29,17 +28,18 @@ impl From for CompressionCodecType { match compression_type { CompressionType::ZSTD => CompressionCodecType::Zstd, CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, - _ => CompressionCodecType::NoCompression, + other_type => { + unimplemented!("Not support compression type: {:?}", other_type) + } } } } -impl From for Option { +impl From for CompressionType { fn from(codec: CompressionCodecType) -> Self { match codec { - CompressionCodecType::NoCompression => None, - CompressionCodecType::Lz4Frame => Some(CompressionType::LZ4_FRAME), - CompressionCodecType::Zstd => Some(CompressionType::ZSTD), + CompressionCodecType::Lz4Frame => CompressionType::LZ4_FRAME, + CompressionCodecType::Zstd => CompressionType::ZSTD, } } } @@ -54,18 +54,21 @@ mod compression_function { pub fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { match self { CompressionCodecType::Lz4Frame => { - let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); - encoder.write_all(input).unwrap(); - encoder.finish().1.unwrap(); - Ok(()) + let mut encoder = lz4::EncoderBuilder::new().build(output)?; + encoder.write_all(input)?; + match encoder.finish().1 { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } } CompressionCodecType::Zstd => { - let mut encoder = zstd::Encoder::new(output, 0).unwrap(); - encoder.write_all(input).unwrap(); - encoder.finish().unwrap(); - Ok(()) + let mut encoder = zstd::Encoder::new(output, 0)?; + encoder.write_all(input)?; + match encoder.finish() { + Ok(_) => Ok(()), + Err(e) => Err(e.into()), + } } - _ => Ok(()), } } @@ -85,7 +88,6 @@ mod compression_function { Err(e) => Err(e.into()), } } - _ => Ok(input.len()), }; result } diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index df9d14ccdb5..a586dacd785 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -52,7 +52,7 @@ use DataType::*; fn read_buffer( buf: &ipc::Buffer, a_data: &[u8], - compression_codec: &CompressionCodecType, + compression_codec: &Option, ) -> Buffer { let start_offset = buf.offset() as usize; let end_offset = start_offset + buf.length() as usize; @@ -62,9 +62,7 @@ fn read_buffer( return Buffer::from(buf_data); } match compression_codec { - CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd - if cfg!(feature = "ipc_compression") || cfg!(test) => - { + Some(_decompressor) if cfg!(feature = "ipc_compression") || cfg!(test) => { // 8byte + data // read the first 8 bytes // if the data is compressed, decompress the data, otherwise return as is @@ -79,16 +77,17 @@ fn read_buffer( Buffer::from(data) } else { // decompress data using the codec - let mut _uncompressed_buffer = Vec::new(); + let mut _uncompressed_buffer = + Vec::with_capacity(decompressed_length as usize); let _input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..]; #[cfg(any(feature = "ipc_compression", test))] - compression_codec + _decompressor .decompress(_input_data, &mut _uncompressed_buffer) .unwrap(); Buffer::from(_uncompressed_buffer) } } - CompressionCodecType::NoCompression => Buffer::from(buf_data), + None => Buffer::from(buf_data), _ => { panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable"); } @@ -100,6 +99,7 @@ fn read_buffer( /// -1: indicate that the data that follows is not compressed /// 0: indicate that there is no data /// positive number: indicate the uncompressed length for the following data +#[inline] fn read_uncompressed_size(buffer: &[u8]) -> i64 { let len_buffer = &buffer[0..8]; // 64-bit little-endian signed integer @@ -124,7 +124,7 @@ fn create_array( dictionaries_by_id: &HashMap, mut node_index: usize, mut buffer_index: usize, - compression_codec: &CompressionCodecType, + compression_codec: &Option, metadata: &ipc::MetadataVersion, ) -> Result<(ArrayRef, usize, usize)> { use DataType::*; @@ -676,13 +676,16 @@ pub fn read_record_batch( })?; let option_compression = batch.compression(); let compression_codec = match option_compression { - None => CompressionCodecType::NoCompression, + None => Ok(None), Some(compression) => match compression.codec() { - CompressionType::ZSTD => CompressionCodecType::Zstd, - CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, - _ => CompressionCodecType::NoCompression, + CompressionType::ZSTD => Ok(Some(CompressionCodecType::Zstd)), + CompressionType::LZ4_FRAME => Ok(Some(CompressionCodecType::Lz4Frame)), + other_type => Err(ArrowError::InvalidArgumentError(format!( + "Not support compression type: {:?}", + other_type + ))), }, - }; + }?; // keep track of buffer and node index, the functions that create arrays mutate these let mut buffer_index = 0; diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index d4bf7a33dde..1dc3c02ec2a 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -63,7 +63,7 @@ pub struct IpcWriteOptions { /// version 2.0.0: V4, with legacy format enabled /// version 4.0.0: V5 metadata_version: ipc::MetadataVersion, - batch_compression_type: CompressionCodecType, + batch_compression_type: Option, } impl IpcWriteOptions { @@ -72,7 +72,7 @@ impl IpcWriteOptions { alignment: usize, write_legacy_ipc_format: bool, metadata_version: ipc::MetadataVersion, - batch_compression_type: CompressionCodecType, + batch_compression_type: Option, ) -> Result { if alignment == 0 || alignment % 8 != 0 { return Err(ArrowError::InvalidArgumentError( @@ -80,11 +80,11 @@ impl IpcWriteOptions { )); } match batch_compression_type { - CompressionCodecType::NoCompression => {} + None => {} _ => { - if metadata_version != ipc::MetadataVersion::V5 { + if metadata_version < ipc::MetadataVersion::V5 { return Err(ArrowError::InvalidArgumentError( - "Compress buffer just support from metadata v5".to_string(), + "Compression only supported in metadata v5 and above".to_string(), )); } } @@ -129,7 +129,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, - batch_compression_type: CompressionCodecType::NoCompression, + batch_compression_type: None, }), ipc::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -142,7 +142,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, - batch_compression_type: CompressionCodecType::NoCompression, + batch_compression_type: None, }) } } @@ -157,7 +157,7 @@ impl Default for IpcWriteOptions { alignment: 8, write_legacy_ipc_format: false, metadata_version: ipc::MetadataVersion::V5, - batch_compression_type: CompressionCodecType::NoCompression, + batch_compression_type: None, } } } @@ -382,7 +382,8 @@ impl IpcDataGenerator { // get the type of compression let compression_codec = write_options.batch_compression_type; - let compression_type: Option = compression_codec.into(); + let compression_type: Option = + compression_codec.map(|v| v.into()); let compression = { if let Some(codec) = compression_type { let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); @@ -458,7 +459,8 @@ impl IpcDataGenerator { // get the type of compression let compression_codec = write_options.batch_compression_type; - let compression_type: Option = compression_codec.into(); + let compression_type: Option = + compression_codec.map(|v| v.into()); let compression = { if let Some(codec) = compression_type { let mut c = ipc::BodyCompressionBuilder::new(&mut fbb); @@ -1070,7 +1072,7 @@ fn write_array_data( offset: i64, num_rows: usize, null_count: usize, - compression_codec: &CompressionCodecType, + compression_codec: &Option, write_options: &IpcWriteOptions, ) -> i64 { let mut offset = offset; @@ -1237,33 +1239,35 @@ fn write_buffer( buffers: &mut Vec, arrow_data: &mut Vec, offset: i64, - compression_codec: &CompressionCodecType, + compression_codec: &Option, ) -> i64 { let origin_buffer_len = buffer.len(); let mut _compression_buffer = Vec::::new(); let (data, uncompression_buffer_len) = match compression_codec { - CompressionCodecType::NoCompression => { + None => { // this buffer_len will not used in the following logic // If we don't use the compression, just write the data in the array (buffer, origin_buffer_len as i64) } - CompressionCodecType::Lz4Frame | CompressionCodecType::Zstd => { - if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { - (buffer, 0) - } else if cfg!(feature = "ipc_compression") || cfg!(test) { - #[cfg(any(feature = "ipc_compression", test))] - compression_codec - .compress(buffer, &mut _compression_buffer) - .unwrap(); - let compression_len = _compression_buffer.len(); - if compression_len > origin_buffer_len { - // the length of compressed data is larger than uncompressed data - // use the uncompressed data with -1 - // -1 indicate that we don't compress the data - (buffer, LENGTH_NO_COMPRESSED_DATA) + Some(_compressor) => { + if cfg!(feature = "ipc_compression") || cfg!(test) { + if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { + (buffer, LENGTH_EMPTY_COMPRESSED_DATA) } else { - // use the compressed data with uncompressed length - (_compression_buffer.as_slice(), origin_buffer_len as i64) + #[cfg(any(feature = "ipc_compression", test))] + _compressor + .compress(buffer, &mut _compression_buffer) + .unwrap(); + let compression_len = _compression_buffer.len(); + if compression_len > origin_buffer_len { + // the length of compressed data is larger than uncompressed data + // use the uncompressed data with -1 + // -1 indicate that we don't compress the data + (buffer, LENGTH_NO_COMPRESSED_DATA) + } else { + // use the compressed data with uncompressed length + (_compression_buffer.as_slice(), origin_buffer_len as i64) + } } } else { panic!("IPC compression not supported. Compile with feature 'ipc_compression' to enable"); @@ -1271,7 +1275,7 @@ fn write_buffer( } }; let len = data.len() as i64; - let total_len = if compression_codec == &CompressionCodecType::NoCompression { + let total_len = if compression_codec.is_none() { buffers.push(ipc::Buffer::new(offset, len)); len } else { @@ -1312,6 +1316,7 @@ mod tests { #[test] fn test_write_with_empty_record_batch() { + let file_name = "arrow_lz4_empty"; let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); let values: Vec> = vec![]; let array = Int32Array::from(values); @@ -1320,12 +1325,13 @@ mod tests { .unwrap(); { let file = - File::create("target/debug/testdata/arrow_lz4.arrow_file").unwrap(); + File::create(format!("target/debug/testdata/{}.arrow_file", file_name)) + .unwrap(); let write_option = IpcWriteOptions::try_new_with_compression( 8, false, ipc::MetadataVersion::V5, - CompressionCodecType::Lz4Frame, + Some(CompressionCodecType::Lz4Frame), ) .unwrap(); let mut writer = @@ -1336,7 +1342,7 @@ mod tests { { // read file let file = - File::open(format!("target/debug/testdata/{}.arrow_file", "arrow_lz4")) + File::open(format!("target/debug/testdata/{}.arrow_file", file_name)) .unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); loop { @@ -1377,7 +1383,7 @@ mod tests { 8, false, ipc::MetadataVersion::V5, - CompressionCodecType::Lz4Frame, + Some(CompressionCodecType::Lz4Frame), ) .unwrap(); let mut writer = @@ -1430,7 +1436,7 @@ mod tests { 8, false, ipc::MetadataVersion::V5, - CompressionCodecType::Zstd, + Some(CompressionCodecType::Zstd), ) .unwrap(); let mut writer = @@ -1851,7 +1857,7 @@ mod tests { 8, false, ipc::MetadataVersion::V5, - CompressionCodecType::Lz4Frame, + Some(CompressionCodecType::Lz4Frame), ) .unwrap(); let mut writer = @@ -1902,7 +1908,7 @@ mod tests { 8, false, ipc::MetadataVersion::V5, - CompressionCodecType::Zstd, + Some(CompressionCodecType::Zstd), ) .unwrap(); let mut writer =