Skip to content

Commit

Permalink
Update to arrow 17.0.0 (#2778)
Browse files Browse the repository at this point in the history
* Update to arrow 17.0.0

* Update for change in Decimal interface
  • Loading branch information
alamb committed Jul 1, 2022
1 parent 5e26b13 commit 85ca8be
Show file tree
Hide file tree
Showing 19 changed files with 154 additions and 119 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "16.0.0" }
arrow = { version = "17.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "9.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "16.0.0" }
arrow-flight = { version = "17.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Expand Up @@ -38,10 +38,10 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
ordered-float = "3.0"
parquet = { version = "16.0.0", features = ["arrow"], optional = true }
parquet = { version = "17.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.18"
13 changes: 11 additions & 2 deletions datafusion/common/src/scalar.rs
Expand Up @@ -28,6 +28,7 @@ use arrow::{
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DECIMAL_MAX_PRECISION,
},
util::decimal::{BasicDecimal, Decimal128},
};
use ordered_float::OrderedFloat;
use std::cmp::Ordering;
Expand Down Expand Up @@ -1275,7 +1276,11 @@ impl ScalarValue {
if array.is_null(index) {
ScalarValue::Decimal128(None, *precision, *scale)
} else {
ScalarValue::Decimal128(Some(array.value(index)), *precision, *scale)
ScalarValue::Decimal128(
Some(array.value(index).as_i128()),
*precision,
*scale,
)
}
}

Expand Down Expand Up @@ -1450,7 +1455,11 @@ impl ScalarValue {
}
match value {
None => array.is_null(index),
Some(v) => !array.is_null(index) && array.value(index) == *v,
Some(v) => {
!array.is_null(index)
&& array.value(index)
== Decimal128::new(precision, scale, &v.to_le_bytes())
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
chrono = { version = "0.4", default-features = false }
Expand All @@ -77,7 +77,7 @@ num-traits = { version = "0.2", optional = true }
num_cpus = "1.13.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "16.0.0", features = ["arrow"] }
parquet = { version = "17.0.0", features = ["arrow"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
16 changes: 10 additions & 6 deletions datafusion/core/src/physical_plan/hash_utils.rs
Expand Up @@ -62,25 +62,29 @@ fn hash_decimal128<'a>(
if array.null_count() == 0 {
if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
*hash = combine_hashes(
i128::get_hash(&array.value(i).as_i128(), random_state),
*hash,
);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = i128::get_hash(&array.value(i), random_state);
*hash = i128::get_hash(&array.value(i).as_i128(), random_state);
}
}
} else if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
*hash = combine_hashes(
i128::get_hash(&array.value(i).as_i128(), random_state),
*hash,
);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = i128::get_hash(&array.value(i), random_state);
*hash = i128::get_hash(&array.value(i).as_i128(), random_state);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/scalar.rs
Expand Up @@ -49,15 +49,15 @@ mod tests {
let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
assert_eq!(1, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(123i128, array.value(0));
assert_eq!(123i128, array.value(0).as_i128());

// decimal scalar to array with size
let array = decimal_value.to_array_of_size(10);
let array_decimal = array.as_any().downcast_ref::<DecimalArray>().unwrap();
assert_eq!(10, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(123i128, array_decimal.value(0));
assert_eq!(123i128, array_decimal.value(9));
assert_eq!(123i128, array_decimal.value(0).as_i128());
assert_eq!(123i128, array_decimal.value(9).as_i128());
// test eq array
assert!(decimal_value.eq_array(&array, 1));
assert!(decimal_value.eq_array(&array, 5));
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Expand Up @@ -36,6 +36,6 @@ path = "src/lib.rs"

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
datafusion-common = { path = "../common", version = "9.0.0" }
sqlparser = "0.18"
2 changes: 1 addition & 1 deletion datafusion/jit/Cargo.toml
Expand Up @@ -36,7 +36,7 @@ path = "src/lib.rs"
jit = []

[dependencies]
arrow = { version = "16.0.0" }
arrow = { version = "17.0.0" }
cranelift = "0.85.0"
cranelift-jit = "0.85.0"
cranelift-module = "0.85.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/Cargo.toml
Expand Up @@ -37,7 +37,7 @@ default = ["unicode_expressions"]
unicode_expressions = []

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
datafusion-common = { path = "../common", version = "9.0.0" }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/Cargo.toml
Expand Up @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"]

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
chrono = { version = "0.4", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Expand Up @@ -182,17 +182,17 @@ macro_rules! typed_min_max_batch_decimal128 {
for i in 1..array.len() {
result = result.$OP(array.value(i));
}
ScalarValue::Decimal128(Some(result), *$PRECISION, *$SCALE)
ScalarValue::Decimal128(Some(result.as_i128()), *$PRECISION, *$SCALE)
} else {
let mut result = 0_i128;
let mut has_value = false;
for i in 0..array.len() {
if !has_value && array.is_valid(i) {
has_value = true;
result = array.value(i);
result = array.value(i).as_i128();
}
if array.is_valid(i) {
result = result.$OP(array.value(i));
result = result.$OP(array.value(i).as_i128());
}
}
ScalarValue::Decimal128(Some(result), *$PRECISION, *$SCALE)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/sum.rs
Expand Up @@ -166,7 +166,7 @@ fn sum_decimal_batch(
let mut result = 0_i128;
for i in 0..array.len() {
if array.is_valid(i) {
result += array.value(i);
result += array.value(i).as_i128();
}
}
Ok(ScalarValue::Decimal128(Some(result), *precision, *scale))
Expand Down

0 comments on commit 85ca8be

Please sign in to comment.