Skip to content

Commit

Permalink
Update for change in Decimal interface
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 1, 2022
1 parent 8e4bcce commit 12cb087
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 105 deletions.
13 changes: 11 additions & 2 deletions datafusion/common/src/scalar.rs
Expand Up @@ -28,6 +28,7 @@ use arrow::{
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DECIMAL_MAX_PRECISION,
},
util::decimal::{BasicDecimal, Decimal128},
};
use ordered_float::OrderedFloat;
use std::cmp::Ordering;
Expand Down Expand Up @@ -1275,7 +1276,11 @@ impl ScalarValue {
if array.is_null(index) {
ScalarValue::Decimal128(None, *precision, *scale)
} else {
ScalarValue::Decimal128(Some(array.value(index)), *precision, *scale)
ScalarValue::Decimal128(
Some(array.value(index).as_i128()),
*precision,
*scale,
)
}
}

Expand Down Expand Up @@ -1450,7 +1455,11 @@ impl ScalarValue {
}
match value {
None => array.is_null(index),
Some(v) => !array.is_null(index) && array.value(index) == *v,
Some(v) => {
!array.is_null(index)
&& array.value(index)
== Decimal128::new(precision, scale, &v.to_le_bytes())
}
}
}

Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/src/physical_plan/hash_utils.rs
Expand Up @@ -62,25 +62,29 @@ fn hash_decimal128<'a>(
if array.null_count() == 0 {
if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
*hash = combine_hashes(
i128::get_hash(&array.value(i).as_i128(), random_state),
*hash,
);
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
*hash = i128::get_hash(&array.value(i), random_state);
*hash = i128::get_hash(&array.value(i).as_i128(), random_state);
}
}
} else if mul_col {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash =
combine_hashes(i128::get_hash(&array.value(i), random_state), *hash);
*hash = combine_hashes(
i128::get_hash(&array.value(i).as_i128(), random_state),
*hash,
);
}
}
} else {
for (i, hash) in hashes_buffer.iter_mut().enumerate() {
if !array.is_null(i) {
*hash = i128::get_hash(&array.value(i), random_state);
*hash = i128::get_hash(&array.value(i).as_i128(), random_state);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/scalar.rs
Expand Up @@ -49,15 +49,15 @@ mod tests {
let array = array.as_any().downcast_ref::<DecimalArray>().unwrap();
assert_eq!(1, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(123i128, array.value(0));
assert_eq!(123i128, array.value(0).as_i128());

// decimal scalar to array with size
let array = decimal_value.to_array_of_size(10);
let array_decimal = array.as_any().downcast_ref::<DecimalArray>().unwrap();
assert_eq!(10, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(123i128, array_decimal.value(0));
assert_eq!(123i128, array_decimal.value(9));
assert_eq!(123i128, array_decimal.value(0).as_i128());
assert_eq!(123i128, array_decimal.value(9).as_i128());
// test eq array
assert!(decimal_value.eq_array(&array, 1));
assert!(decimal_value.eq_array(&array, 5));
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/aggregate/min_max.rs
Expand Up @@ -182,17 +182,17 @@ macro_rules! typed_min_max_batch_decimal128 {
for i in 1..array.len() {
result = result.$OP(array.value(i));
}
ScalarValue::Decimal128(Some(result), *$PRECISION, *$SCALE)
ScalarValue::Decimal128(Some(result.as_i128()), *$PRECISION, *$SCALE)
} else {
let mut result = 0_i128;
let mut has_value = false;
for i in 0..array.len() {
if !has_value && array.is_valid(i) {
has_value = true;
result = array.value(i);
result = array.value(i).as_i128();
}
if array.is_valid(i) {
result = result.$OP(array.value(i));
result = result.$OP(array.value(i).as_i128());
}
}
ScalarValue::Decimal128(Some(result), *$PRECISION, *$SCALE)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/aggregate/sum.rs
Expand Up @@ -166,7 +166,7 @@ fn sum_decimal_batch(
let mut result = 0_i128;
for i in 0..array.len() {
if array.is_valid(i) {
result += array.value(i);
result += array.value(i).as_i128();
}
}
Ok(ScalarValue::Decimal128(Some(result), *precision, *scale))
Expand Down
102 changes: 57 additions & 45 deletions datafusion/physical-expr/src/expressions/cast.rs
Expand Up @@ -166,6 +166,7 @@ mod tests {
TimestampNanosecondArray, UInt32Array,
},
datatypes::*,
util::decimal::{BasicDecimal, Decimal128},
};
use datafusion_common::Result;

Expand Down Expand Up @@ -278,17 +279,20 @@ mod tests {
.collect::<DecimalArray>()
.with_precision_and_scale(10, 3)?;

// closure that converts to i128
let convert = |v: i128| Decimal128::new(20, 6, &v.to_le_bytes());

generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
DecimalArray,
DataType::Decimal(20, 6),
vec![
Some(1_234_000_i128),
Some(2_222_000_i128),
Some(3_000_i128),
Some(4_000_000_i128),
Some(5_000_000_i128),
Some(convert(1_234_000)),
Some(convert(2_222_000)),
Some(convert(3_000)),
Some(convert(4_000_000)),
Some(convert(5_000_000)),
None,
],
DEFAULT_DATAFUSION_CAST_OPTIONS
Expand All @@ -300,17 +304,18 @@ mod tests {
.collect::<DecimalArray>()
.with_precision_and_scale(10, 3)?;

let convert = |v: i128| Decimal128::new(10, 2, &v.to_le_bytes());
generic_decimal_to_other_test_cast!(
decimal_array,
DataType::Decimal(10, 3),
DecimalArray,
DataType::Decimal(10, 2),
vec![
Some(123_i128),
Some(222_i128),
Some(0_i128),
Some(400_i128),
Some(500_i128),
Some(convert(123)),
Some(convert(222)),
Some(convert(0)),
Some(convert(400)),
Some(convert(500)),
None,
],
DEFAULT_DATAFUSION_CAST_OPTIONS
Expand Down Expand Up @@ -461,120 +466,127 @@ mod tests {
#[test]
fn test_cast_numeric_to_decimal() -> Result<()> {
// int8
let convert = |v: i128| Decimal128::new(3, 0, &v.to_le_bytes());
generic_test_cast!(
Int8Array,
DataType::Int8,
vec![1, 2, 3, 4, 5],
DecimalArray,
DataType::Decimal(3, 0),
vec![
Some(1_i128),
Some(2_i128),
Some(3_i128),
Some(4_i128),
Some(5_i128),
Some(convert(1)),
Some(convert(2)),
Some(convert(3)),
Some(convert(4)),
Some(convert(5)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);

// int16
let convert = |v: i128| Decimal128::new(5, 0, &v.to_le_bytes());
generic_test_cast!(
Int16Array,
DataType::Int16,
vec![1, 2, 3, 4, 5],
DecimalArray,
DataType::Decimal(5, 0),
vec![
Some(1_i128),
Some(2_i128),
Some(3_i128),
Some(4_i128),
Some(5_i128),
Some(convert(1)),
Some(convert(2)),
Some(convert(3)),
Some(convert(4)),
Some(convert(5)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);

// int32
let convert = |v: i128| Decimal128::new(10, 0, &v.to_le_bytes());
generic_test_cast!(
Int32Array,
DataType::Int32,
vec![1, 2, 3, 4, 5],
DecimalArray,
DataType::Decimal(10, 0),
vec![
Some(1_i128),
Some(2_i128),
Some(3_i128),
Some(4_i128),
Some(5_i128),
Some(convert(1)),
Some(convert(2)),
Some(convert(3)),
Some(convert(4)),
Some(convert(5)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);

// int64
let convert = |v: i128| Decimal128::new(20, 0, &v.to_le_bytes());
generic_test_cast!(
Int64Array,
DataType::Int64,
vec![1, 2, 3, 4, 5],
DecimalArray,
DataType::Decimal(20, 0),
vec![
Some(1_i128),
Some(2_i128),
Some(3_i128),
Some(4_i128),
Some(5_i128),
Some(convert(1)),
Some(convert(2)),
Some(convert(3)),
Some(convert(4)),
Some(convert(5)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);

// int64 to different scale
let convert = |v: i128| Decimal128::new(20, 2, &v.to_le_bytes());
generic_test_cast!(
Int64Array,
DataType::Int64,
vec![1, 2, 3, 4, 5],
DecimalArray,
DataType::Decimal(20, 2),
vec![
Some(100_i128),
Some(200_i128),
Some(300_i128),
Some(400_i128),
Some(500_i128),
Some(convert(100)),
Some(convert(200)),
Some(convert(300)),
Some(convert(400)),
Some(convert(500)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);

// float32
let convert = |v: i128| Decimal128::new(10, 2, &v.to_le_bytes());
generic_test_cast!(
Float32Array,
DataType::Float32,
vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50],
DecimalArray,
DataType::Decimal(10, 2),
vec![
Some(150_i128),
Some(250_i128),
Some(300_i128),
Some(112_i128),
Some(550_i128),
Some(convert(150)),
Some(convert(250)),
Some(convert(300)),
Some(convert(112)),
Some(convert(550)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);

// float64
let convert = |v: i128| Decimal128::new(20, 4, &v.to_le_bytes());
generic_test_cast!(
Float64Array,
DataType::Float64,
vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50],
DecimalArray,
DataType::Decimal(20, 4),
vec![
Some(15000_i128),
Some(25000_i128),
Some(30000_i128),
Some(11234_i128),
Some(55000_i128),
Some(convert(15000)),
Some(convert(25000)),
Some(convert(30000)),
Some(convert(11234)),
Some(convert(55000)),
],
DEFAULT_DATAFUSION_CAST_OPTIONS
);
Expand Down

0 comments on commit 12cb087

Please sign in to comment.