Skip to content

Commit

Permalink
fix build error in hash_utils, hash_aggregate, hash_join, parquet and…
Browse files Browse the repository at this point in the history
… aggregate
  • Loading branch information
houqp committed Sep 16, 2021
1 parent 00df64a commit 428efa8
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 61 deletions.
9 changes: 5 additions & 4 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,17 +409,17 @@ fn group_aggregate_batch(
}

// Collect all indices + offsets based on keys in this vec
let mut batch_indices = MutableBuffer::<i32>::new();
let mut batch_indices = MutableBuffer::<u32>::new();
let mut offsets = vec![0];
let mut offset_so_far = 0;
for group_idx in groups_with_rows.iter() {
let indices = &accumulators.group_states[*group_idx].indices;
batch_indices.append_slice(indices)?;
batch_indices.extend_from_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices =
Int32Array::from_data(DataType::Int32, batch_indices.into(), None);
UInt32Array::from_data(DataType::UInt32, batch_indices.into(), None);

// `Take` all values based on indices into Arrays
let values: Vec<Vec<Arc<dyn Array>>> = aggr_input_values
Expand Down Expand Up @@ -946,7 +946,8 @@ fn create_batch_from_map(
.iter()
.zip(output_schema.fields().iter())
.map(|(col, desired_field)| {
arrow::compute::cast::cast(col, desired_field.data_type())
arrow::compute::cast::cast(col.as_ref(), desired_field.data_type())
.map(|v| Arc::from(v))
})
.collect::<ArrowResult<Vec<_>>>()?;

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,8 +757,8 @@ fn build_join_indexes(
// If no rows matched left, still must keep the right
// with all nulls for left
if no_match {
left_indices.push(None)?;
right_indices.push(Some(row as u32))?;
left_indices.push(None);
right_indices.push(Some(row as u32));
}
}
None => {
Expand Down
17 changes: 10 additions & 7 deletions datafusion/src/physical_plan/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,8 @@ pub fn create_hashes<'a>(
mod tests {
use std::sync::Arc;

use arrow::array::DictionaryArray;
use arrow::array::TryExtend;
use arrow::array::{DictionaryArray, MutableDictionaryArray, MutableUtf8Array};

use super::*;

Expand Down Expand Up @@ -659,8 +660,8 @@ mod tests {

#[test]
fn create_hashes_for_float_arrays() -> Result<()> {
let f32_arr = Arc::new(Float32Array::from(vec![0.12, 0.5, 1f32, 444.7]));
let f64_arr = Arc::new(Float64Array::from(vec![0.12, 0.5, 1f64, 444.7]));
let f32_arr = Arc::new(Float32Array::from_slice(&[0.12, 0.5, 1f32, 444.7]));
let f64_arr = Arc::new(Float64Array::from_slice(&[0.12, 0.5, 1f64, 444.7]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; f32_arr.len()];
Expand All @@ -680,8 +681,9 @@ mod tests {
let strings = vec![Some("foo"), None, Some("bar"), Some("foo"), None];

let string_array = Arc::new(strings.iter().cloned().collect::<Utf8Array<i32>>());
let dict_array =
Arc::new(strings.iter().cloned().collect::<DictionaryArray<i8>>());
let dict_array = MutableDictionaryArray::<i8, MutableUtf8Array<i32>>::new();
dict_array.try_extend(strings.iter().cloned()).unwrap();
let dict_array = dict_array.into_arc();

let random_state = RandomState::with_seeds(0, 0, 0, 0);

Expand Down Expand Up @@ -721,8 +723,9 @@ mod tests {
let strings2 = vec![Some("blarg"), Some("blah"), None];

let string_array = Arc::new(strings1.iter().cloned().collect::<Utf8Array<i32>>());
let dict_array =
Arc::new(strings2.iter().cloned().collect::<DictionaryArray<i32>>());
let dict_array = MutableDictionaryArray::<i32, MutableUtf8Array<i32>>::new();
dict_array.try_extend(strings2.iter().cloned()).unwrap();
let dict_array = dict_array.into_arc();

let random_state = RandomState::with_seeds(0, 0, 0, 0);

Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ macro_rules! get_min_max_values {
.collect();

// ignore errors converting to arrays (e.g. different types)
ScalarValue::iter_to_array(scalar_values).ok()
ScalarValue::iter_to_array(scalar_values).ok().map(|v| Arc::from(v))
}}
}

Expand All @@ -536,7 +536,7 @@ fn build_row_group_predicate(
predicate_builder: &PruningPredicate,
metrics: ParquetFileMetrics,
row_group_metadata: &[RowGroupMetaData],
) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
) -> Box<dyn Fn(usize, &RowGroupMetaData) -> bool> {
let parquet_schema = predicate_builder.schema().as_ref();

let pruning_stats = RowGroupPruningStatistics {
Expand All @@ -550,14 +550,14 @@ fn build_row_group_predicate(
// NB: false means don't scan row group
let num_pruned = values.iter().filter(|&v| !*v).count();
metrics.row_groups_pruned.add(num_pruned);
Box::new(move |_, i| values[i])
Box::new(move |i, _| values[i])
}
// stats filter array could not be built
// return a closure which will not filter out any row groups
Err(e) => {
debug!("Error evaluating row group predicate values {}", e);
metrics.predicate_evaluation_errors.add(1);
Box::new(|_r, _i| true)
Box::new(|_i, _r| true)
}
}
}
Expand Down Expand Up @@ -591,7 +591,7 @@ fn read_partition(
reader.set_groups_filter(Arc::new(build_row_group_predicate(
predicate_builder,
file_metrics,
reader.metadata().row_groups,
&reader.metadata().row_groups,
)));
}

Expand Down
5 changes: 2 additions & 3 deletions datafusion/src/physical_plan/windows/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ impl AggregateWindowExpr {
let num_rows = batch.num_rows();
let partition_points =
self.evaluate_partition_points(num_rows, &self.partition_columns(batch)?)?;
let sort_partition_points = self
.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?
.wtf;
let sort_partition_points =
self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?;
let values = self.evaluate_args(batch)?;
let results = partition_points
.iter()
Expand Down
76 changes: 36 additions & 40 deletions datafusion/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,36 +350,25 @@ macro_rules! build_timestamp_list {
true,
))),
$SIZE,
).into();
)
.into();
null_array
}
Some(values) => {
let values = values.as_ref();
match $TIME_UNIT {
TimeUnit::Second => build_values_list!(
Int64Vec,
TimestampSecond,
values,
$SIZE
),
TimeUnit::Microsecond => build_values_list!(
Int64Vec,
TimestampMillisecond,
values,
$SIZE
),
TimeUnit::Millisecond => build_values_list!(
Int64Vec,
TimestampMicrosecond,
values,
$SIZE
),
TimeUnit::Nanosecond => build_values_list!(
Int64Vec,
TimestampNanosecond,
values,
$SIZE
),
TimeUnit::Second => {
build_values_list!(Int64Vec, TimestampSecond, values, $SIZE)
}
TimeUnit::Microsecond => {
build_values_list!(Int64Vec, TimestampMillisecond, values, $SIZE)
}
TimeUnit::Millisecond => {
build_values_list!(Int64Vec, TimestampMicrosecond, values, $SIZE)
}
TimeUnit::Nanosecond => {
build_values_list!(Int64Vec, TimestampNanosecond, values, $SIZE)
}
}
}
}
Expand Down Expand Up @@ -534,7 +523,7 @@ impl ScalarValue {
/// Example
/// ```
/// use datafusion::scalar::ScalarValue;
/// use arrow::array::{ArrayRef, BooleanArray};
/// use arrow::array::BooleanArray;
///
/// let scalars = vec![
/// ScalarValue::Boolean(Some(true)),
Expand All @@ -546,7 +535,7 @@ impl ScalarValue {
/// let array = ScalarValue::iter_to_array(scalars.into_iter())
/// .unwrap();
///
/// let expected: ArrayRef = std::sync::Arc::new(
/// let expected: Box<dyn Array> = Box::new(
/// BooleanArray::from(vec![
/// Some(true),
/// None,
Expand All @@ -558,7 +547,7 @@ impl ScalarValue {
/// ```
pub fn iter_to_array(
scalars: impl IntoIterator<Item = ScalarValue>,
) -> Result<ArrayRef> {
) -> Result<Box<dyn Array>> {
let mut scalars = scalars.into_iter().peekable();

// figure out the type based on the first element
Expand All @@ -576,7 +565,7 @@ impl ScalarValue {
macro_rules! build_array_primitive {
($TY:ty, $SCALAR_TY:ident, $DT:ident) => {{
{
Arc::new(scalars
Box::new(scalars
.map(|sv| {
if let ScalarValue::$SCALAR_TY(v) = sv {
Ok(v)
Expand All @@ -589,7 +578,7 @@ impl ScalarValue {
}
})
.collect::<Result<PrimitiveArray<$TY>>>()?.to($DT)
) as ArrayRef
) as Box<dyn Array>
}
}};
}
Expand All @@ -612,7 +601,7 @@ impl ScalarValue {
}
})
.collect::<Result<$ARRAY_TY>>()?;
Arc::new(array)
Box::new(array)
}
}};
}
Expand Down Expand Up @@ -651,13 +640,13 @@ impl ScalarValue {
}

let array: ListArray<i32> = array.into();
Arc::new(array)
Box::new(array)
}}
}

use DataType::*;
let array: ArrayRef = match &data_type {
DataType::Boolean => Arc::new(
let array: Box<dyn Array> = match &data_type {
DataType::Boolean => Box::new(
scalars
.map(|sv| {
if let ScalarValue::Boolean(v) = sv {
Expand Down Expand Up @@ -840,7 +829,9 @@ impl ScalarValue {
None => new_null_array(self.get_datatype(), size).into(),
},
ScalarValue::List(values, data_type) => match data_type.as_ref() {
DataType::Boolean => build_list!(MutableBooleanArray, Boolean, values, size),
DataType::Boolean => {
build_list!(MutableBooleanArray, Boolean, values, size)
}
DataType::Int8 => build_list!(Int8Vec, Int8, values, size),
DataType::Int16 => build_list!(Int16Vec, Int16, values, size),
DataType::Int32 => build_list!(Int32Vec, Int32, values, size),
Expand All @@ -855,7 +846,9 @@ impl ScalarValue {
build_timestamp_list!(unit.clone(), tz.clone(), values, size)
}
DataType::Utf8 => build_list!(MutableStringArray, Utf8, values, size),
DataType::LargeUtf8 => build_list!(MutableLargeStringArray, LargeUtf8, values, size),
DataType::LargeUtf8 => {
build_list!(MutableLargeStringArray, LargeUtf8, values, size)
}
dt => panic!("Unexpected DataType for list {:?}", dt),
},
ScalarValue::Date32(e) => match e {
Expand Down Expand Up @@ -1459,7 +1452,7 @@ mod tests {

let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();

let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
let expected: Box<dyn Array> = Box::new($ARRAYTYPE::from($INPUT));

assert_eq!(&array, &expected);
}};
Expand All @@ -1476,7 +1469,7 @@ mod tests {

let array = ScalarValue::iter_to_array(scalars.into_iter()).unwrap();

let expected: ArrayRef = Arc::new($ARRAYTYPE::from($INPUT));
let expected: Box<dyn Array> = Box::new($ARRAYTYPE::from($INPUT));

assert_eq!(&array, &expected);
}};
Expand All @@ -1496,7 +1489,7 @@ mod tests {
let expected: $ARRAYTYPE =
$INPUT.iter().map(|v| v.map(|v| v.to_vec())).collect();

let expected: ArrayRef = Arc::new(expected);
let expected: Box<dyn Array> = Box::new(expected);

assert_eq!(&array, &expected);
}};
Expand Down Expand Up @@ -1707,7 +1700,10 @@ mod tests {
($INPUT:expr, $INDEX_TY:ty, $SCALAR_TY:ident) => {{
TestCase {
array: {
let mut array = MutableDictionaryArray::<$INDEX_TY, MutableUtf8Array<i32>>::new();
let mut array = MutableDictionaryArray::<
$INDEX_TY,
MutableUtf8Array<i32>,
>::new();
array.try_extend(*($INPUT)).unwrap();
let array: DictionaryArray<$INDEX_TY> = array.into();
Arc::new(array)
Expand Down

0 comments on commit 428efa8

Please sign in to comment.