Skip to content

Commit

Permalink
Upgrade to arrow 23.0.0 (#3483)
Browse files Browse the repository at this point in the history
* Changes for API

* Update avro code for API changes

* Use divide_opt` kernel

* Update update_arrow_deps.py

* Update arrow dependency to 23.0.0

* Use nicer RecordBatchOptions API

* cleanups

* fix: update
  • Loading branch information
alamb committed Sep 20, 2022
1 parent e873423 commit 67002a0
Show file tree
Hide file tree
Showing 19 changed files with 45 additions and 36 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.62"
readme = "README.md"

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "12.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = "22.0.0"
arrow-flight = "23.0.0"
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
5 changes: 4 additions & 1 deletion datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::pin::Pin;
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use datafusion::arrow::error::ArrowError;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
Expand Down Expand Up @@ -77,7 +78,9 @@ impl FlightService for FlightServiceImpl {
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(&schema, &options).into();
let schema_result = SchemaAsIpc::new(&schema, &options)
.try_into()
.map_err(|e: ArrowError| tonic::Status::internal(e.to_string()))?;

Ok(Response::new(schema_result))
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ pyarrow = ["pyo3"]

[dependencies]
apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.87.0", optional = true }
object_store = { version = "0.5.0", optional = true }
ordered-float = "3.0"
parquet = { version = "22.0.0", features = ["arrow"], optional = true }
parquet = { version = "23.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.17.1", optional = true }
sqlparser = "0.23"
6 changes: 3 additions & 3 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
Expand All @@ -78,7 +78,7 @@ num_cpus = "1.13.0"
object_store = "0.5.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "22.0.0", features = ["arrow", "async"] }
parquet = { version = "23.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.17.1", optional = true }
Expand All @@ -93,7 +93,7 @@ url = "2.2"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
arrow = { version = "22.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow = { version = "23.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
async-trait = "0.1.53"
criterion = "0.4"
csv = "1.1.6"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
7 changes: 2 additions & 5 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
use crate::arrow::array::{
make_array, Array, ArrayBuilder, ArrayData, ArrayDataBuilder, ArrayRef,
BooleanBuilder, LargeStringArray, ListBuilder, NullArray, OffsetSizeTrait,
PrimitiveArray, PrimitiveBuilder, StringArray, StringBuilder,
StringDictionaryBuilder,
PrimitiveArray, StringArray, StringBuilder, StringDictionaryBuilder,
};
use crate::arrow::buffer::{Buffer, MutableBuffer};
use crate::arrow::datatypes::{
Expand Down Expand Up @@ -171,9 +170,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
where
T: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
let key_builder = PrimitiveBuilder::<T>::with_capacity(row_len);
let values_builder = StringBuilder::with_capacity(row_len, 5);
StringDictionaryBuilder::new(key_builder, values_builder)
StringDictionaryBuilder::with_capacity(row_len, row_len, row_len)
}

fn build_wrapped_list_array(
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ pub fn concat_batches(
row_count
);

let mut options = RecordBatchOptions::default();
options.row_count = Some(row_count);
let options = RecordBatchOptions::new().with_row_count(Some(row_count));

RecordBatch::try_new_with_options(schema.clone(), arrays, &options)
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ impl SchemaAdapter {
let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);

// Necessary to handle empty batches
let mut options = RecordBatchOptions::default();
options.row_count = Some(batch.num_rows());
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

Ok(RecordBatch::try_new_with_options(
projected_schema,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "12.0.0" }
sqlparser = "0.23"
2 changes: 1 addition & 1 deletion datafusion/jit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
cranelift = "0.87.0"
cranelift-jit = "0.87.0"
cranelift-module = "0.87.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ default = ["unicode_expressions"]
unicode_expressions = []

[dependencies]
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "12.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
10 changes: 4 additions & 6 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{any::Any, sync::Arc};

use arrow::array::*;
use arrow::compute::kernels::arithmetic::{
add, add_scalar, divide, divide_scalar, modulus, modulus_scalar, multiply,
add, add_scalar, divide_opt, divide_scalar, modulus, modulus_scalar, multiply,
multiply_scalar, subtract, subtract_scalar,
};
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
Expand Down Expand Up @@ -60,7 +60,7 @@ use kernels::{
bitwise_xor, bitwise_xor_scalar,
};
use kernels_arrow::{
add_decimal, add_decimal_scalar, divide_decimal, divide_decimal_scalar,
add_decimal, add_decimal_scalar, divide_decimal_scalar, divide_opt_decimal,
eq_decimal_scalar, gt_decimal_scalar, gt_eq_decimal_scalar, is_distinct_from,
is_distinct_from_bool, is_distinct_from_decimal, is_distinct_from_null,
is_distinct_from_utf8, is_not_distinct_from, is_not_distinct_from_bool,
Expand Down Expand Up @@ -844,7 +844,7 @@ impl BinaryExpr {
Operator::Plus => binary_primitive_array_op!(left, right, add),
Operator::Minus => binary_primitive_array_op!(left, right, subtract),
Operator::Multiply => binary_primitive_array_op!(left, right, multiply),
Operator::Divide => binary_primitive_array_op!(left, right, divide),
Operator::Divide => binary_primitive_array_op!(left, right, divide_opt),
Operator::Modulo => binary_primitive_array_op!(left, right, modulus),
Operator::And => {
if left_data_type == &DataType::Boolean {
Expand Down Expand Up @@ -1326,9 +1326,7 @@ mod tests {
let string_type = DataType::Utf8;

// build dictionary
let keys_builder = PrimitiveBuilder::<Int32Type>::with_capacity(10);
let values_builder = arrow::array::StringBuilder::with_capacity(10, 1024);
let mut dict_builder = StringDictionaryBuilder::new(keys_builder, values_builder);
let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();

dict_builder.append("one")?;
dict_builder.append_null();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ pub(crate) fn multiply_decimal_scalar(
Ok(array)
}

pub(crate) fn divide_decimal(
pub(crate) fn divide_opt_decimal(
left: &Decimal128Array,
right: &Decimal128Array,
) -> Result<Decimal128Array> {
Expand Down Expand Up @@ -636,7 +636,7 @@ mod tests {
25,
3,
);
let result = divide_decimal(&left_decimal_array, &right_decimal_array)?;
let result = divide_opt_decimal(&left_decimal_array, &right_decimal_array)?;
let expect = create_decimal_array(
&[Some(123456700), None, Some(22446672), Some(-10037130), None],
25,
Expand Down Expand Up @@ -674,7 +674,8 @@ mod tests {
let left_decimal_array = create_decimal_array(&[Some(101)], 10, 1);
let right_decimal_array = create_decimal_array(&[Some(0)], 1, 1);

let err = divide_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
let err =
divide_opt_decimal(&left_decimal_array, &right_decimal_array).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
let err = divide_decimal_scalar(&left_decimal_array, 0).unwrap_err();
assert_eq!("Arrow error: Divide by zero error", err.to_string());
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ default = []
json = ["pbjson", "pbjson-build", "serde", "serde_json"]

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
datafusion = { path = "../core", version = "12.0.0" }
datafusion-common = { path = "../common", version = "12.0.0" }
datafusion-expr = { path = "../expr", version = "12.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/row/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ path = "src/lib.rs"
jit = ["datafusion-jit"]

[dependencies]
arrow = "22.0.0"
arrow = "23.0.0"
datafusion-common = { path = "../common", version = "12.0.0" }
datafusion-jit = { path = "../jit", version = "12.0.0", optional = true }
paste = "^1.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ unicode_expressions = []

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "22.0.0", features = ["prettyprint"] }
arrow = { version = "23.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "12.0.0" }
datafusion-expr = { path = "../expr", version = "12.0.0" }
hashbrown = "0.12"
Expand Down
16 changes: 14 additions & 2 deletions dev/update_arrow_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,20 @@ def update_version_cargo_toml(cargo_toml, new_version):

for section in ("dependencies", "dev-dependencies"):
for (dep_name, constraint) in doc.get(section, {}).items():
if dep_name in ("arrow", "parquet", "arrow-flight") and constraint.get("version") is not None:
doc[section][dep_name]["version"] = new_version
if dep_name in ("arrow", "parquet", "arrow-flight"):
if type(constraint) == tomlkit.items.String:
# handle constraint that is (only) a string like '12',
doc[section][dep_name] = new_version
elif type(constraint) == dict:
# handle constraint that is itself a struct like
# {'version': '12', 'features': ['prettyprint']}
doc[section][dep_name]["version"] = new_version
elif type(constraint) == tomlkit.items.InlineTable:
# handle constraint that is itself a struct like
# {'version': '12', 'features': ['prettyprint']}
doc[section][dep_name]["version"] = new_version
else:
print("Unknown type for {} {}: {}", dep_name, constraint, type(constraint))

with open(cargo_toml, 'w') as f:
f.write(tomlkit.dumps(doc))
Expand Down

0 comments on commit 67002a0

Please sign in to comment.