Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to arrow 23.0.0 #3483

Merged
merged 8 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.62"
readme = "README.md"

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "12.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = "22.0.0"
arrow-flight = "23.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
5 changes: 4 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::pin::Pin;
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use datafusion::arrow::error::ArrowError;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
Expand Down Expand Up @@ -77,7 +78,9 @@ impl FlightService for FlightServiceImpl {
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(&schema, &options).into();
let schema_result = SchemaAsIpc::new(&schema, &options)
.try_into()
.map_err(|e: ArrowError| tonic::Status::internal(e.to_string()))?;

Ok(Response::new(schema_result))
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ pyarrow = ["pyo3"]

[dependencies]
apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.87.0", optional = true }
object_store = { version = "0.5.0", optional = true }
ordered-float = "3.0"
parquet = { version = "22.0.0", features = ["arrow"], optional = true }
parquet = { version = "23.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.17.1", optional = true }
sqlparser = "0.23"
6 changes: 3 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
Expand All @@ -78,7 +78,7 @@ num_cpus = "1.13.0"
object_store = "0.5.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "22.0.0", features = ["arrow", "async"] }
parquet = { version = "23.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.17.1", optional = true }
Expand All @@ -93,7 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
arrow = { version = "22.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow = { version = "23.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
criterion = "0.4"
csv = "1.1.6"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
7 changes: 2 additions & 5 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
use crate::arrow::array::{
make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
StringDictionaryBuilder,
PrimitiveArray, StringArray, StringBuilder, StringDictionaryBuilder,
};
use crate::arrow::buffer::{Buffer, MutableBuffer};
use crate::arrow::datatypes::{
Expand Down Expand Up @@ -171,9 +170,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
where
T: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
let key_builder = PrimitiveBuilder::<T>::with_capacity(row_len);
let values_builder = StringBuilder::with_capacity(row_len, 5);
StringDictionaryBuilder::new(key_builder, values_builder)
StringDictionaryBuilder::with_capacity(row_len, row_len, row_len)
}

fn build_wrapped_list_array(
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ pub fn concat_batches(
row_count
);

let mut options = RecordBatchOptions::default();
options.row_count = Some(row_count);
let options = RecordBatchOptions::new().with_row_count(Some(row_count));

RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ impl SchemaAdapter {
let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);

// Necessary to handle empty batches
let mut options = RecordBatchOptions::default();
options.row_count = Some(batch.num_rows());
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to use the nice API @askoa added in apache/arrow-rs#2729


Ok(RecordBatch::try_new_with_options(
projected_schema,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "12.0.0" }
sqlparser = "0.23"
2 changes: 1 addition & 1 deletion datafusion/jit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
cranelift = "0.87.0"
cranelift-jit = "0.87.0"
cranelift-module = "0.87.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ default = ["unicode_expressions"]
unicode_expressions = []

[dependencies]
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "12.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{any::Any, sync::Arc};

use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, add_scalar, divide, divide_scalar, modulus, modulus_scalar, multiply,
add, add_scalar, divide_opt, divide_scalar, modulus, modulus_scalar, multiply,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default divide kernel changed behavior -- divide_opt now has the "NULL on divide by zero" behavior datafusion expects

multiply_scalar, subtract, subtract_scalar,
};
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
Expand Down Expand Up @@ -60,7 +60,7 @@ use kernels::{
bitwise_xor, bitwise_xor_scalar,
};
use kernels_arrow::{
add_decimal, add_decimal_scalar, divide_decimal, divide_decimal_scalar,
add_decimal, add_decimal_scalar, divide_decimal_scalar, divide_opt_decimal,
eq_decimal_scalar, gt_decimal_scalar, gt_eq_decimal_scalar, is_distinct_from,
is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_null,
is_distinct_from_utf8, is_not_distinct_from, is_not_distinct_from_bool,
Expand Down Expand Up @@ -844,7 +844,7 @@ impl BinaryExpr {
Operator::Plus => binary_primitive_array_op!(left, right, add),
Operator::Minus => binary_primitive_array_op!(left, right, subtract),
Operator::Multiply => binary_primitive_array_op!(left, right, multiply),
Operator::Divide => binary_primitive_array_op!(left, right, divide),
Operator::Divide => binary_primitive_array_op!(left, right, divide_opt),
Operator::Modulo => binary_primitive_array_op!(left, right, modulus),
Operator::And => {
if left_data_type == &DataType::Boolean {
Expand Down Expand Up @@ -1326,9 +1326,7 @@ mod tests {
let string_type = DataType::Utf8;

// build dictionary
let keys_builder = PrimitiveBuilder::<Int32Type>::with_capacity(10);
let values_builder = arrow::array::StringBuilder::with_capacity(10, 1024);
let mut dict_builder = StringDictionaryBuilder::new(keys_builder, values_builder);
let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();

dict_builder.append("one")?;
dict_builder.append_null();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ pub(crate) fn multiply_decimal_scalar(
Ok(array)
}

pub(crate) fn divide_decimal(
pub(crate) fn divide_opt_decimal(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to be consistent with the arrow rename

left: &Decimal128Array,
right: &Decimal128Array,
) -> Result<Decimal128Array> {
Expand Down Expand Up @@ -636,7 +636,7 @@ mod tests {
25,
3,
);
let result = divide_decimal(&left_decimal_array, &right_decimal_array)?;
let result = divide_opt_decimal(&left_decimal_array, &right_decimal_array)?;
let expect = create_decimal_array(
&[Some(123456700), None, Some(22446672), Some(-10037130), None],
25,
Expand Down Expand Up @@ -674,7 +674,8 @@ mod tests {
let left_decimal_array = create_decimal_array(&[Some(101)], 10, 1);
let right_decimal_array = create_decimal_array(&[Some(0)], 1, 1);

let err = divide_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
let err =
divide_opt_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
let err = divide_decimal_scalar(&left_decimal_array, 0).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ default = []
json = ["pbjson", "pbjson-build", "serde", "serde_json"]

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
datafusion = { path = "../core", version = "12.0.0" }
datafusion-common = { path = "../common", version = "12.0.0" }
datafusion-expr = { path = "../expr", version = "12.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/row/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ path = "src/lib.rs"
jit = ["datafusion-jit"]

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
datafusion-common = { path = "../common", version = "12.0.0" }
datafusion-jit = { path = "../jit", version = "12.0.0", optional = true }
paste = "^1.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ unicode_expressions = []

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "12.0.0" }
datafusion-expr = { path = "../expr", version = "12.0.0" }
hashbrown = "0.12"
Expand Down
16 changes: 14 additions & 2 deletions dev/update_arrow_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,20 @@ def update_version_cargo_toml(cargo_toml, new_version):

for section in ("dependencies", "dev-dependencies"):
for (dep_name, constraint) in doc.get(section, {}).items():
if dep_name in ("arrow", "parquet", "arrow-flight") and constraint.get("version") is not None:
doc[section][dep_name]["version"] = new_version
if dep_name in ("arrow", "parquet", "arrow-flight"):
if type(constraint) == tomlkit.items.String:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is code I wrote in IOx in https://github.com/influxdata/influxdb_iox/blob/main/scripts/update_arrow_deps.py

Basically the previous version didn't handle arrow = "22.0.0" type toml

# handle constraint that is (only) a string like '12',
doc[section][dep_name] = new_version
elif type(constraint) == dict:
# handle constraint that is itself a struct like
# {'version': '12', 'features': ['prettyprint']}
doc[section][dep_name]["version"] = new_version
elif type(constraint) == tomlkit.items.InlineTable:
# handle constraint that is itself a struct like
# {'version': '12', 'features': ['prettyprint']}
doc[section][dep_name]["version"] = new_version
else:
print("Unknown type for {} {}: {}", dep_name, constraint, type(constraint))

with open(cargo_toml, 'w') as f:
f.write(tomlkit.dumps(doc))
Expand Down