Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): use arrow-rs write parquet #11473

Closed
wants to merge 19 commits into from
Closed
49 changes: 46 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2",
"serde_types",
] }

arrow-array = "37.0.0"
arrow-format = { version = "0.8.0", features = ["flight-data", "flight-service", "ipc"] }
arrow-schema = "37.0.0"
futures = "0.3.24"
native = { package = "strawboat", git = "https://github.com/sundy-li/strawboat", rev = "533c9c4" }
parquet = "37.0.0"
parquet2 = { version = "0.17.0", default_features = false, features = ["serde_types"] }

[dev-dependencies]
5 changes: 4 additions & 1 deletion src/common/arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ mod parquet_write;
pub mod schema_projection;

pub use arrow;
pub use arrow_array;
pub use arrow_format;
pub use arrow_schema;
pub use native;
pub use parquet2 as parquet;
pub use parquet;
pub use parquet2;
pub use parquet_read::read_columns_async;
pub use parquet_read::read_columns_many_async;
pub use parquet_write::write_parquet_file;
Expand Down
16 changes: 14 additions & 2 deletions src/common/exception/src/exception_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,20 @@ impl From<common_arrow::arrow::error::Error> for ErrorCode {
}
}

impl From<common_arrow::parquet::error::Error> for ErrorCode {
fn from(error: common_arrow::parquet::error::Error) -> Self {
impl From<common_arrow::parquet2::error::Error> for ErrorCode {
fn from(error: common_arrow::parquet2::error::Error) -> Self {
ErrorCode::from_std_error(error)
}
}

impl From<common_arrow::parquet::errors::ParquetError> for ErrorCode {
fn from(error: common_arrow::parquet::errors::ParquetError) -> Self {
ErrorCode::from_std_error(error)
}
}

impl From<common_arrow::arrow_schema::ArrowError> for ErrorCode {
fn from(error: common_arrow::arrow_schema::ArrowError) -> Self {
ErrorCode::from_std_error(error)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/storage/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_arrow::arrow::io::parquet::read as pread;
use common_arrow::parquet::metadata::FileMetaData;
use common_arrow::parquet2::metadata::FileMetaData;
use common_base::runtime::execute_futures_in_parallel;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down
2 changes: 0 additions & 2 deletions src/query/expression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ common-io = { path = "../../common/io" }
# GitHub dependencies

# Crates.io dependencies
arrow-array = "37.0.0"
arrow-schema = "37.0.0"
base64 = "0.21.0"
chrono = { workspace = true }
chrono-tz = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions src/query/expression/src/convert_arrow_rs/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use std::sync::Arc;

use arrow_array::Array;
use arrow_schema::ArrowError;
use arrow_schema::Field;
use common_arrow::arrow_array::Array;
use common_arrow::arrow_schema::ArrowError;
use common_arrow::arrow_schema::Field;

use crate::Column;
use crate::DataField;
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/convert_arrow_rs/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use common_arrow::arrow_array::RecordBatch;
use common_arrow::arrow_schema::ArrowError;

use crate::Column;
use crate::DataBlock;
Expand Down
12 changes: 6 additions & 6 deletions src/query/expression/src/convert_arrow_rs/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::ArrowError;
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field as ArrowField;
use arrow_schema::Fields;
use arrow_schema::Schema as ArrowSchema;
use arrow_schema::TimeUnit;
use common_arrow::arrow_schema::ArrowError;
use common_arrow::arrow_schema::DataType as ArrowDataType;
use common_arrow::arrow_schema::Field as ArrowField;
use common_arrow::arrow_schema::Fields;
use common_arrow::arrow_schema::Schema as ArrowSchema;
use common_arrow::arrow_schema::TimeUnit;

use crate::types::decimal::DecimalSize;
use crate::types::DataType;
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,7 +1281,7 @@ impl From<&TableField> for ArrowField {
}
}

fn set_nullable(ty: &ArrowDataType) -> ArrowDataType {
pub(crate) fn set_nullable(ty: &ArrowDataType) -> ArrowDataType {
// if the struct type is nullable, need to set inner fields as nullable
match ty {
ArrowDataType::Struct(fields) => {
Expand Down
83 changes: 0 additions & 83 deletions src/query/expression/src/utils/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,94 +16,11 @@ use std::cmp::Ordering;

use chrono::Datelike;
use chrono::NaiveDate;
use common_arrow::arrow::chunk::Chunk as ArrowChunk;
use common_arrow::arrow::datatypes::DataType as ArrowDataType;
use common_arrow::arrow::io::parquet::write::transverse;
use common_arrow::arrow::io::parquet::write::RowGroupIterator;
use common_arrow::arrow::io::parquet::write::WriteOptions;
use common_arrow::parquet::compression::CompressionOptions;
use common_arrow::parquet::encoding::Encoding;
use common_arrow::parquet::metadata::ThriftFileMetaData;
use common_arrow::parquet::write::Version;
use common_arrow::write_parquet_file;
use common_exception::ErrorCode;
use common_exception::Result;

use crate::types::decimal::Decimal;
use crate::types::decimal::DecimalSize;
use crate::DataBlock;
use crate::TableSchema;

pub fn serialize_to_parquet_with_compression(
blocks: Vec<DataBlock>,
schema: impl AsRef<TableSchema>,
buf: &mut Vec<u8>,
compression: CompressionOptions,
) -> Result<(u64, ThriftFileMetaData)> {
let arrow_schema = schema.as_ref().to_arrow();

let row_group_write_options = WriteOptions {
write_statistics: false,
compression,
version: Version::V2,
data_pagesize_limit: None,
};
let batches = blocks
.into_iter()
.map(ArrowChunk::try_from)
.collect::<Result<Vec<_>>>()?;

let encoding_map = |data_type: &ArrowDataType| match data_type {
ArrowDataType::Dictionary(..) => Encoding::RleDictionary,
_ => col_encoding(data_type),
};

let encodings: Vec<Vec<_>> = arrow_schema
.fields
.iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

let row_groups = RowGroupIterator::try_new(
batches.into_iter().map(Ok),
&arrow_schema,
row_group_write_options,
encodings,
)?;

use common_arrow::parquet::write::WriteOptions as FileWriteOption;
let options = FileWriteOption {
write_statistics: false,
version: Version::V2,
};

match write_parquet_file(buf, row_groups, arrow_schema.clone(), options) {
Ok(result) => Ok(result),
Err(cause) => Err(ErrorCode::ParquetFileInvalid(cause.to_string())),
}
}

pub fn serialize_to_parquet(
blocks: Vec<DataBlock>,
schema: impl AsRef<TableSchema>,
buf: &mut Vec<u8>,
) -> Result<(u64, ThriftFileMetaData)> {
serialize_to_parquet_with_compression(blocks, schema, buf, CompressionOptions::Lz4Raw)
}

pub fn col_encoding(_data_type: &ArrowDataType) -> Encoding {
// Although encoding does work, parquet2 has not implemented decoding of DeltaLengthByteArray yet, we fallback to Plain
// From parquet2: Decoding "DeltaLengthByteArray"-encoded required V2 pages is not yet implemented for Binary.
//
// match data_type {
// ArrowDataType::Binary
// | ArrowDataType::LargeBinary
// | ArrowDataType::Utf8
// | ArrowDataType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// _ => Encoding::Plain,
//}
Encoding::Plain
}

pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;

Expand Down
10 changes: 4 additions & 6 deletions src/query/expression/src/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use serde::Serialize;
use serde::Serializer;

use crate::property::Domain;
use crate::set_nullable;
use crate::types::array::ArrayColumn;
use crate::types::array::ArrayColumnBuilder;
use crate::types::bitmap::BitmapType;
Expand Down Expand Up @@ -1195,13 +1196,10 @@ impl Column {
array.clone()
})
.collect::<Vec<_>>();
let ty = set_nullable(arrow_array.data_type());
Box::new(
common_arrow::arrow::array::StructArray::try_new(
arrow_array.data_type().clone(),
fields,
Some(validity),
)
.unwrap(),
common_arrow::arrow::array::StructArray::try_new(ty, fields, Some(validity))
.unwrap(),
)
}
_ => arrow_array.with_validity(Some(validity)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use common_arrow::arrow::io::parquet::read::read_columns;
use common_arrow::arrow::io::parquet::read::read_metadata_async;
use common_arrow::arrow::io::parquet::read::to_deserializer;
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
use common_arrow::parquet::metadata::ColumnChunkMetaData;
use common_arrow::parquet::metadata::FileMetaData;
use common_arrow::parquet::metadata::RowGroupMetaData;
use common_arrow::parquet::read::read_metadata;
use common_arrow::parquet2::metadata::ColumnChunkMetaData;
use common_arrow::parquet2::metadata::FileMetaData;
use common_arrow::parquet2::metadata::RowGroupMetaData;
use common_arrow::parquet2::read::read_metadata;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_arrow::parquet::metadata::ThriftFileMetaData;
use common_arrow::parquet::format::FileMetaData;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::FunctionContext;
Expand Down Expand Up @@ -57,7 +57,7 @@ impl<'a> BlockWriter<'a> {
block: DataBlock,
col_stats: StatisticsOfColumns,
cluster_stats: Option<ClusterStatistics>,
) -> Result<(BlockMeta, Option<ThriftFileMetaData>)> {
) -> Result<(BlockMeta, Option<FileMetaData>)> {
let (location, block_id) = self.location_generator.gen_block_location();

let data_accessor = &self.data_accessor;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<'a> BlockWriter<'a> {
schema: TableSchemaRef,
block: &DataBlock,
block_id: Uuid,
) -> Result<(u64, Option<Location>, Option<ThriftFileMetaData>)> {
) -> Result<(u64, Option<Location>, Option<FileMetaData>)> {
let location = self
.location_generator
.block_bloom_index_location(&block_id);
Expand Down