From 7dbd4d6d85998a4809b93801f7267937fc419a1e Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Mon, 13 Jun 2022 11:11:16 +0800 Subject: [PATCH] support compression for IPC --- arrow/Cargo.toml | 3 + arrow/src/ipc/compression/compression.rs | 84 ++++++++++++++++++ arrow/src/ipc/compression/mod.rs | 21 +++++ arrow/src/ipc/mod.rs | 1 + arrow/src/ipc/reader.rs | 99 +++++++++++++++++++--- arrow/src/ipc/writer.rs | 103 +++++++++++++++++++++-- 6 files changed, 291 insertions(+), 20 deletions(-) create mode 100644 arrow/src/ipc/compression/compression.rs create mode 100644 arrow/src/ipc/compression/mod.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index a9996b8b475..8878d4a607f 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -38,6 +38,9 @@ path = "src/lib.rs" bench = false [dependencies] +byteorder = "1" +lz4 = "1.23" +zstd = "0.11.1" serde = { version = "1.0" } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } diff --git a/arrow/src/ipc/compression/compression.rs b/arrow/src/ipc/compression/compression.rs new file mode 100644 index 00000000000..588f905f2e7 --- /dev/null +++ b/arrow/src/ipc/compression/compression.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::io::{Read, Write}; +use crate::error::{ArrowError, Result}; +use crate::ipc::CompressionType; + +#[derive(Clone, Copy, PartialEq)] +pub enum CompressionCodecType { + NoCompression, + Lz4Frame, + ZSTD, +} + +impl From for CompressionCodecType { + fn from(compression_type: CompressionType) -> Self { + match compression_type { + CompressionType::ZSTD => CompressionCodecType::ZSTD, + CompressionType::LZ4_FRAME => CompressionCodecType::Lz4Frame, + _ => CompressionCodecType::NoCompression + } + } +} + +impl Into for CompressionCodecType { + fn into(self) -> CompressionType { + match self { + CompressionCodecType::NoCompression => CompressionType(-1), + CompressionCodecType::Lz4Frame => CompressionType::LZ4_FRAME, + CompressionCodecType::ZSTD => CompressionType::ZSTD + } + } +} + +impl CompressionCodecType { + pub fn compress(&self, input: &[u8], output: &mut Vec) -> Result<()> { + match self { + CompressionCodecType::Lz4Frame => { + let mut encoder = lz4::EncoderBuilder::new().build(output).unwrap(); + encoder.write_all(input).unwrap(); + Ok(()) + } + CompressionCodecType::ZSTD => { + Err(ArrowError::NotYetImplemented("Compression don't support the ZSTD".to_string())) + } + _ => { + Ok(()) + } + } + } + + pub fn decompress(&self, input: &[u8], output: &mut Vec) -> Result { + let result = match self { + CompressionCodecType::Lz4Frame => { + let mut decoder = lz4::Decoder::new(input)?; + decoder.read_to_end(output) + } + CompressionCodecType::ZSTD => { + let mut decoder = zstd::Decoder::new(input)?; + decoder.read_to_end(output) + } + _ => { + Ok(input.len()) + } + }; + result.map_err(|e| { + ArrowError::from(e) + }) + } +} \ No newline at end of file diff --git a/arrow/src/ipc/compression/mod.rs b/arrow/src/ipc/compression/mod.rs new file mode 100644 index 00000000000..89bcbb493c3 --- /dev/null +++ b/arrow/src/ipc/compression/mod.rs @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod compression; +pub(crate) const LENGTH_EMPTY_COMPRESSED_DATA: i64 = 0; +pub(crate) const LENGTH_NO_COMPRESSED_DATA: i64 = -1; +pub(crate) const LENGTH_OF_PREFIX_DATA: i64 = 8; \ No newline at end of file diff --git a/arrow/src/ipc/mod.rs b/arrow/src/ipc/mod.rs index d5455b454e7..f19f7dd9187 100644 --- a/arrow/src/ipc/mod.rs +++ b/arrow/src/ipc/mod.rs @@ -29,6 +29,7 @@ pub mod writer; #[allow(clippy::redundant_field_names)] #[allow(non_camel_case_types)] pub mod gen; +mod compression; pub use self::gen::File::*; pub use self::gen::Message::*; diff --git a/arrow/src/ipc/reader.rs b/arrow/src/ipc/reader.rs index 41c0c3293ac..6efd419154b 100644 --- a/arrow/src/ipc/reader.rs +++ b/arrow/src/ipc/reader.rs @@ -23,6 +23,7 @@ use std::collections::HashMap; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; +use byteorder::{ByteOrder, LittleEndian}; use crate::array::*; use crate::buffer::Buffer; @@ -34,13 +35,62 @@ use crate::record_batch::{RecordBatch, RecordBatchReader}; use ipc::CONTINUATION_MARKER; use DataType::*; +use crate::ipc::{CompressionType}; +use crate::ipc::compression::compression::CompressionCodecType; +use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_NO_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA}; + /// Read a buffer based on offset and length -fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer { +/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58 +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. +fn read_buffer(buf: &ipc::Buffer, a_data: &[u8], compression_codec: &CompressionCodecType) -> Buffer { let start_offset = buf.offset() as usize; let end_offset = start_offset + buf.length() as usize; let buf_data = &a_data[start_offset..end_offset]; - Buffer::from(&buf_data) + match compression_codec { + CompressionCodecType::NoCompression => { + Buffer::from(buf_data) + } + CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => { + // 8byte + data + // read the first 8 bytes + // if the data is compressed, decompress the data, otherwise decompress data. + let decompressed_length = read_uncompressed_size(buf_data); + if decompressed_length == LENGTH_EMPTY_COMPRESSED_DATA { + // emtpy + let empty = Vec::::new(); + Buffer::from(empty) + } else if decompressed_length == LENGTH_NO_COMPRESSED_DATA { + // not compress + let data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)]; + Buffer::from(data) + } else { + // decompress data using the codec + let mut uncompressed_buffer = Vec::new(); + let input_data = &buf_data[(LENGTH_OF_PREFIX_DATA as usize)..(end_offset - start_offset)]; + // TODO consider the error result + compression_codec.decompress(input_data, &mut uncompressed_buffer).unwrap(); + Buffer::from(uncompressed_buffer) + } + } + } +} + +/// Get the uncompressed length +/// Notes: +/// -1: indicate that the data that follows is not compressed +/// 0: indicate that there is no data +/// positive number: indicate the uncompressed length for the following data +fn read_uncompressed_size(buffer : &[u8]) -> i64 { + let len_buffer = &buffer[0..8]; + // 64-bit little-endian signed integer + LittleEndian::read_i64(len_buffer) } /// Coordinates reading arrays based on data types. @@ -60,6 +110,7 @@ fn create_array( dictionaries_by_id: &HashMap, mut node_index: usize, mut buffer_index: usize, + compression_codec: &CompressionCodecType ) -> Result<(ArrayRef, usize, usize)> { use DataType::*; let data_type = field.data_type(); @@ -70,7 +121,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 3] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(), ); node_index += 1; @@ -83,7 +134,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(), ); node_index += 1; @@ -94,7 +145,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(); node_index += 1; buffer_index += 2; @@ -106,6 +157,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec )?; node_index = triple.1; buffer_index = triple.2; @@ -116,7 +168,7 @@ fn create_array( let list_node = &nodes[node_index]; let list_buffers: Vec = buffers[buffer_index..=buffer_index] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(); node_index += 1; buffer_index += 1; @@ -128,6 +180,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec )?; node_index = triple.1; buffer_index = triple.2; @@ -136,7 +189,7 @@ fn create_array( } Struct(struct_fields) => { let struct_node = &nodes[node_index]; - let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data); + let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data, compression_codec); node_index += 1; buffer_index += 1; @@ -153,6 +206,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec )?; node_index = triple.1; buffer_index = triple.2; @@ -172,7 +226,7 @@ fn create_array( let index_node = &nodes[node_index]; let index_buffers: Vec = buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(); let dict_id = field.dict_id().ok_or_else(|| { @@ -202,13 +256,13 @@ fn create_array( let len = union_node.length() as usize; let type_ids: Buffer = - read_buffer(&buffers[buffer_index], data)[..len].into(); + read_buffer(&buffers[buffer_index], data, compression_codec)[..len].into(); buffer_index += 1; let value_offsets = match mode { UnionMode::Dense => { - let buffer = read_buffer(&buffers[buffer_index], data); + let buffer = read_buffer(&buffers[buffer_index], data, compression_codec); buffer_index += 1; Some(buffer[..len * 4].into()) } @@ -226,6 +280,7 @@ fn create_array( dictionaries_by_id, node_index, buffer_index, + compression_codec )?; node_index = triple.1; @@ -264,7 +319,7 @@ fn create_array( data_type, buffers[buffer_index..buffer_index + 2] .iter() - .map(|buf| read_buffer(buf, data)) + .map(|buf| read_buffer(buf, data, compression_codec)) .collect(), ); node_index += 1; @@ -589,6 +644,26 @@ pub fn read_record_batch( let field_nodes = batch.nodes().ok_or_else(|| { ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; + // TODO check the compression body logical + let compression_codec = match batch.compression() { + None => { + CompressionCodecType::NoCompression + } + Some(compression) => { + match compression.codec() { + CompressionType::ZSTD => { + CompressionCodecType::ZSTD + }, + CompressionType::LZ4_FRAME => { + CompressionCodecType::Lz4Frame + } + _ => { + CompressionCodecType::NoCompression + } + } + } + }; + // keep track of buffer and node index, the functions that create arrays mutate these let mut buffer_index = 0; let mut node_index = 0; @@ -607,6 +682,7 @@ pub fn read_record_batch( dictionaries_by_id, node_index, buffer_index, + &compression_codec, )?; node_index = triple.1; buffer_index = triple.2; @@ -640,6 +716,7 @@ pub fn read_record_batch( dictionaries_by_id, node_index, buffer_index, + &compression_codec, )?; node_index = triple.1; buffer_index = triple.2; diff --git a/arrow/src/ipc/writer.rs b/arrow/src/ipc/writer.rs index ffeeadc9d99..b5f7b4078e4 100644 --- a/arrow/src/ipc/writer.rs +++ b/arrow/src/ipc/writer.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use std::io::{BufWriter, Write}; +use byteorder::{ByteOrder, LittleEndian}; use flatbuffers::FlatBufferBuilder; @@ -37,6 +38,9 @@ use crate::record_batch::RecordBatch; use crate::util::bit_util; use ipc::CONTINUATION_MARKER; +use crate::ipc::{BodyCompressionMethod, CompressionType}; +use crate::ipc::compression::compression::CompressionCodecType; +use crate::ipc::compression::{LENGTH_EMPTY_COMPRESSED_DATA, LENGTH_OF_PREFIX_DATA}; /// IPC write options used to control the behaviour of the writer #[derive(Debug, Clone)] @@ -55,6 +59,7 @@ pub struct IpcWriteOptions { /// version 2.0.0: V4, with legacy format enabled /// version 4.0.0: V5 metadata_version: ipc::MetadataVersion, + // batch_compression_type: CompressionType } impl IpcWriteOptions { @@ -91,6 +96,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, + // batch_compression_type: CompressionType::NO_COMPRESSION, }) } } @@ -105,6 +111,7 @@ impl Default for IpcWriteOptions { alignment: 8, write_legacy_ipc_format: false, metadata_version: ipc::MetadataVersion::V5, + // batch_compression_type: CompressionType::NO_COMPRESSION, } } } @@ -326,6 +333,10 @@ impl IpcDataGenerator { let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; let mut offset = 0; + + // get the type of compression + let compression_codec = CompressionCodecType::NoCompression; + let compression_type: CompressionType = compression_codec.into(); for array in batch.columns() { let array_data = array.data(); offset = write_array_data( @@ -336,18 +347,26 @@ impl IpcDataGenerator { offset, array.len(), array.null_count(), + &compression_codec ); } // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); - let root = { let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + // build the compression if specify the codec for the compression + if compression_type.0 >= CompressionType::ENUM_MIN && compression_type.0 <= CompressionType::ENUM_MAX { + let mut compression_fbb = FlatBufferBuilder::new(); + let mut body_compression_builder = ipc::BodyCompressionBuilder::new(&mut compression_fbb); + body_compression_builder.add_method(BodyCompressionMethod::BUFFER); + body_compression_builder.add_codec(compression_type); + batch_builder.add_compression(body_compression_builder.finish()); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -381,6 +400,9 @@ impl IpcDataGenerator { let mut buffers: Vec = vec![]; let mut arrow_data: Vec = vec![]; + // get the type of compression + let compression_codec = CompressionCodecType::NoCompression; + let compression_type: CompressionType = compression_codec.into(); write_array_data( array_data, &mut buffers, @@ -389,6 +411,7 @@ impl IpcDataGenerator { 0, array_data.len(), array_data.null_count(), + &compression_codec ); // write data @@ -400,6 +423,14 @@ impl IpcDataGenerator { batch_builder.add_length(array_data.len() as i64); batch_builder.add_nodes(nodes); batch_builder.add_buffers(buffers); + // build the compression if specify the codec for the compression + if compression_type.0 >= CompressionType::ENUM_MIN && compression_type.0 <= CompressionType::ENUM_MAX { + let mut compression_fbb = FlatBufferBuilder::new(); + let mut body_compression_builder = ipc::BodyCompressionBuilder::new(&mut compression_fbb); + body_compression_builder.add_method(BodyCompressionMethod::BUFFER); + body_compression_builder.add_codec(compression_type); + batch_builder.add_compression(body_compression_builder.finish()); + } batch_builder.finish() }; @@ -858,6 +889,7 @@ fn write_array_data( offset: i64, num_rows: usize, null_count: usize, + compression_codec: &CompressionCodecType ) -> i64 { let mut offset = offset; if !matches!(array_data.data_type(), DataType::Null) { @@ -885,11 +917,11 @@ fn write_array_data( Some(buffer) => buffer.clone(), }; - offset = write_buffer(&null_buffer, buffers, arrow_data, offset); + offset = write_buffer(&null_buffer, buffers, arrow_data, offset, compression_codec); } array_data.buffers().iter().for_each(|buffer| { - offset = write_buffer(buffer, buffers, arrow_data, offset); + offset = write_buffer(buffer, buffers, arrow_data, offset, compression_codec); }); if !matches!(array_data.data_type(), DataType::Dictionary(_, _)) { @@ -904,6 +936,7 @@ fn write_array_data( offset, data_ref.len(), data_ref.null_count(), + &compression_codec ); }); } @@ -912,19 +945,64 @@ fn write_array_data( } /// Write a buffer to a vector of bytes, and add its ipc::Buffer to a vector +/// From https://github.com/apache/arrow/blob/6a936c4ff5007045e86f65f1a6b6c3c955ad5103/format/Message.fbs#L58 +/// Each constituent buffer is first compressed with the indicated +/// compressor, and then written with the uncompressed length in the first 8 +/// bytes as a 64-bit little-endian signed integer followed by the compressed +/// buffer bytes (and then padding as required by the protocol). The +/// uncompressed length may be set to -1 to indicate that the data that +/// follows is not compressed, which can be useful for cases where +/// compression does not yield appreciable savings. fn write_buffer( buffer: &Buffer, buffers: &mut Vec, arrow_data: &mut Vec, offset: i64, + compression_codec: &CompressionCodecType ) -> i64 { - let len = buffer.len(); - let pad_len = pad_to_8(len as u32); - let total_len: i64 = (len + pad_len) as i64; + let origin_buffer_len = buffer.len(); + let mut compression_buffer = Vec::::new(); + let (data, uncompression_buffer_len) = match compression_codec { + CompressionCodecType::NoCompression => { + // this buffer_len will not used in the following logic + // If we don't use the compression, just write the data in the array + (buffer.as_slice(), origin_buffer_len as i64) + } + CompressionCodecType::Lz4Frame | CompressionCodecType::ZSTD => { + if (origin_buffer_len as i64) == LENGTH_EMPTY_COMPRESSED_DATA { + (buffer.as_slice(), 0) + } else { + compression_codec.compress(buffer.as_slice(), &mut compression_buffer).unwrap(); + if compression_buffer.len() > origin_buffer_len { + // the length of compressed data is larger than uncompressed data + // use the uncompressed data with -1 + // -1 indicate that we don't compress the data + (buffer.as_slice(), -1) + } else { + // use the compressed data with uncompressed length + (compression_buffer.as_slice(), origin_buffer_len as i64) + } + } + } + }; + let len = data.len() as i64; + // TODO: don't need to pad each buffer, and just need to pad the tail of the message body + // let pad_len = pad_to_8(len as u32); + // let total_len: i64 = (len + pad_len) as i64; // assert_eq!(len % 8, 0, "Buffer width not a multiple of 8 bytes"); - buffers.push(ipc::Buffer::new(offset, total_len)); - arrow_data.extend_from_slice(buffer.as_slice()); - arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); + + let total_len = if compression_codec == &CompressionCodecType::NoCompression { + buffers.push(ipc::Buffer::new(offset, len)); + len + } else { + buffers.push(ipc::Buffer::new(offset, len + LENGTH_OF_PREFIX_DATA)); + // write the prefix of the uncompressed length + LittleEndian::write_i64(arrow_data.as_mut_slice(), uncompression_buffer_len); + len + LENGTH_OF_PREFIX_DATA + }; + arrow_data.extend_from_slice(data); + + // arrow_data.extend_from_slice(&vec![0u8; pad_len][..]); offset + total_len } @@ -950,6 +1028,13 @@ mod tests { use crate::ipc::reader::*; use crate::util::integration_util::*; + #[test] + fn vec_write() { + let mut buffer = Vec::::new(); + LittleEndian::write_i64(buffer.as_mut_slice(), 0); + println!("{:?}", buffer); + } + #[test] fn test_write_file() { let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, false)]);