Skip to content

Commit

Permalink
Add support for all types, fix math
Browse files Browse the repository at this point in the history
  • Loading branch information
avantgardnerio committed Jun 29, 2022
1 parent c37d29e commit 874a5ed
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 53 deletions.
10 changes: 7 additions & 3 deletions datafusion/common/src/scalar.rs
Expand Up @@ -38,6 +38,7 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};
/// Represents a dynamically typed, nullable single value.
/// This is the single-valued counter-part of arrow’s `Array`.
/// https://arrow.apache.org/docs/python/api/datatypes.html
/// https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375
#[derive(Clone)]
pub enum ScalarValue {
/// represents `DataType::Null` (castable to/from any other type)
Expand Down Expand Up @@ -88,11 +89,14 @@ pub enum ScalarValue {
TimestampMicrosecond(Option<i64>, Option<String>),
/// Timestamp Nanoseconds
TimestampNanosecond(Option<i64>, Option<String>),
/// Interval with YearMonth unit
/// Number of elapsed whole months since epoch
IntervalYearMonth(Option<i32>),
/// Interval with DayTime unit
/// Number of elapsed days and milliseconds since epoch (no leap seconds)
/// stored as 2 contiguous 32-bit signed integers
IntervalDayTime(Option<i64>),
/// Interval with MonthDayNano unit
/// A triple of the number of elapsed months, days, and nanoseconds.
/// Months and days are encoded as 32-bit signed integers.
/// Nanoseconds is encoded as a 64-bit signed integer (no leap seconds).
IntervalMonthDayNano(Option<i128>),
/// struct of nested ScalarValue
Struct(Option<Vec<ScalarValue>>, Box<Vec<Field>>),
Expand Down
97 changes: 47 additions & 50 deletions datafusion/physical-expr/src/expressions/datetime.rs
Expand Up @@ -18,7 +18,7 @@
use crate::PhysicalExpr;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use chrono::{Datelike, NaiveDate};
use chrono::{Datelike, Duration, NaiveDate};
use datafusion_common::Result;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_expr::{ColumnarValue, Operator};
Expand Down Expand Up @@ -77,15 +77,15 @@ impl PhysicalExpr for DateIntervalExpr {
self
}

fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result<DataType> {
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.lhs.data_type(input_schema)
}

fn nullable(&self, input_schema: &Schema) -> datafusion_common::Result<bool> {
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.lhs.nullable(input_schema)
}

fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result<ColumnarValue> {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let dates = self.lhs.evaluate(batch)?;
let intervals = self.rhs.evaluate(batch)?;

Expand All @@ -101,10 +101,8 @@ impl PhysicalExpr for DateIntervalExpr {
// Convert to NaiveDate
let epoch = NaiveDate::from_ymd(1970, 1, 1);
let prior = match operand {
ScalarValue::Date32(Some(date)) => {
epoch.add(chrono::Duration::days(date as i64))
}
ScalarValue::Date64(Some(date)) => epoch.add(chrono::Duration::days(date)),
ScalarValue::Date32(Some(d)) => epoch.add(Duration::days(d as i64)),
ScalarValue::Date64(Some(ms)) => epoch.add(Duration::milliseconds(ms)),
_ => Err(DataFusionError::Execution(format!(
"Invalid lhs type for DateIntervalExpr: {:?}",
operand
Expand All @@ -120,23 +118,10 @@ impl PhysicalExpr for DateIntervalExpr {
))?,
};

// Negate for subtraction
let interval = match &scalar {
ScalarValue::IntervalDayTime(Some(interval)) => *interval,
ScalarValue::IntervalYearMonth(Some(interval)) => *interval as i64,
ScalarValue::IntervalMonthDayNano(Some(_interval)) => {
Err(DataFusionError::Execution(
"DateIntervalExpr does not support IntervalMonthDayNano".to_string(),
))?
}
other => Err(DataFusionError::Execution(format!(
"DateIntervalExpr does not support non-interval type {:?}",
other
)))?,
};
let interval = match &self.op {
Operator::Plus => interval,
Operator::Minus => -interval,
// Invert sign for subtraction
let sign = match &self.op {
Operator::Plus => 1,
Operator::Minus => -1,
_ => {
// this should be unreachable because we check the operators in `try_new`
Err(DataFusionError::Execution(
Expand All @@ -145,42 +130,26 @@ impl PhysicalExpr for DateIntervalExpr {
}
};

// Add interval
// Do math
let posterior = match scalar {
ScalarValue::IntervalDayTime(Some(_)) => {
prior.add(chrono::Duration::days(interval))
}
ScalarValue::IntervalYearMonth(Some(_)) => {
let target = add_months(prior, interval);
let target_plus = add_months(target, 1);
let last_day = target_plus.sub(chrono::Duration::days(1));
let day = min(prior.day(), last_day.day());
NaiveDate::from_ymd(target.year(), target.month(), day)
}
ScalarValue::IntervalMonthDayNano(Some(_)) => {
Err(DataFusionError::Execution(
"DateIntervalExpr does not support IntervalMonthDayNano".to_string(),
))?
}
ScalarValue::IntervalDayTime(Some(i)) => add_day_time(prior, *i, sign),
ScalarValue::IntervalYearMonth(Some(i)) => add_months(prior, *i * sign),
ScalarValue::IntervalMonthDayNano(Some(i)) => add_m_d_nano(prior, *i, sign),
other => Err(DataFusionError::Execution(format!(
"DateIntervalExpr does not support non-interval type {:?}",
other
)))?,
};

// convert back
let posterior = posterior.sub(epoch).num_days();
let res = match operand {
ScalarValue::Date32(Some(_)) => {
let casted = i32::try_from(posterior).map_err(|_| {
DataFusionError::Execution(
"Date arithmetic out of bounds!".to_string(),
)
})?;
ColumnarValue::Scalar(ScalarValue::Date32(Some(casted)))
let days = posterior.sub(epoch).num_days() as i32;
ColumnarValue::Scalar(ScalarValue::Date32(Some(days)))
}
ScalarValue::Date64(Some(_)) => {
ColumnarValue::Scalar(ScalarValue::Date64(Some(posterior)))
let ms = posterior.sub(epoch).num_milliseconds();
ColumnarValue::Scalar(ScalarValue::Date64(Some(ms)))
}
_ => Err(DataFusionError::Execution(format!(
"Invalid lhs type for DateIntervalExpr: {}",
Expand All @@ -191,7 +160,35 @@ impl PhysicalExpr for DateIntervalExpr {
}
}

fn add_months(dt: NaiveDate, delta: i64) -> NaiveDate {
fn add_m_d_nano(prior: NaiveDate, interval: i128, sign: i32) -> NaiveDate {
let interval = interval as u128;
let months = (interval >> 96) as i32 * sign;
let days = (interval >> 64) as i32 * sign;
let nanos = interval as i64 * sign as i64;
let a = add_months(prior, months);
let b = a.add(Duration::days(days as i64));
let c = b.add(Duration::nanoseconds(nanos));
c
}

fn add_day_time(prior: NaiveDate, interval: i64, sign: i32) -> NaiveDate {
let interval = interval as u64;
let ms = (interval >> 32) as i32 * sign;
let days = interval as i32 * sign;
let intermediate = prior.add(Duration::days(days as i64));
let posterior = intermediate.add(Duration::milliseconds(ms as i64));
posterior
}

fn add_months(prior: NaiveDate, interval: i32) -> NaiveDate {
let target = chrono_add_months(prior, interval);
let target_plus = chrono_add_months(target, 1);
let last_day = target_plus.sub(chrono::Duration::days(1));
let day = min(prior.day(), last_day.day());
NaiveDate::from_ymd(target.year(), target.month(), day)
}

fn chrono_add_months(dt: NaiveDate, delta: i32) -> NaiveDate {
let ay = dt.year();
let am = dt.month() as i32 - 1; // zero-based for modulo operations
let bm = am + delta as i32;
Expand Down

0 comments on commit 874a5ed

Please sign in to comment.