Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to_timestamp #838

Merged
merged 40 commits into from Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b40ff39
functionality and pytest
sarahyurick Oct 5, 2022
93d13d3
style fix
sarahyurick Oct 5, 2022
827cfa9
Merge branch 'main' into to_timestamp
sarahyurick Oct 14, 2022
ab053b0
add format param
sarahyurick Oct 14, 2022
6bdf852
lint
sarahyurick Oct 14, 2022
ed42f94
remove quotes from result
sarahyurick Oct 14, 2022
190624a
return date64 instead of str
sarahyurick Oct 14, 2022
067722d
lint
sarahyurick Oct 14, 2022
a3c6482
Apply suggestions from code review
sarahyurick Oct 18, 2022
16ebc65
add string input and test
sarahyurick Oct 18, 2022
26c2d71
lint
sarahyurick Oct 18, 2022
f7cbf9f
timestampadd parser test
sarahyurick Oct 19, 2022
3ae33a8
change Variadic to Exact
sarahyurick Oct 19, 2022
bd25755
Merge branch 'main' into to_timestamp
sarahyurick Oct 19, 2022
9f80fb5
rust test and pdlike
sarahyurick Oct 24, 2022
228959b
fix rust test maybe?
sarahyurick Oct 24, 2022
beef8c2
minor change
sarahyurick Oct 24, 2022
8b54493
fix rust test
sarahyurick Oct 24, 2022
457bf0d
Merge branch 'main' into to_timestamp
sarahyurick Oct 26, 2022
89af2bf
gpu test?
sarahyurick Oct 27, 2022
78096c1
edit gpu test
sarahyurick Oct 27, 2022
5ca6f96
try again
sarahyurick Oct 28, 2022
e8cc5a1
dask_cudf
sarahyurick Oct 28, 2022
b48426e
try except to_cupy
sarahyurick Oct 28, 2022
6b4e0be
use dd and add scalar/string tests
sarahyurick Oct 28, 2022
3eca988
style fix
sarahyurick Oct 28, 2022
f917a34
pass most gpu tests?
sarahyurick Nov 1, 2022
df688e7
Update call.py
sarahyurick Nov 1, 2022
ae9f38f
Merge branch 'main' into to_timestamp
sarahyurick Nov 1, 2022
65a010d
Apply suggestions from code review
sarahyurick Nov 2, 2022
7078909
Merge branch 'main' into to_timestamp
sarahyurick Nov 2, 2022
1e98a46
add pytest.mark.skip and comments for gpu tests
sarahyurick Nov 2, 2022
b6c5549
update with Ayush's suggestions
sarahyurick Nov 3, 2022
afb33e6
link to issue
sarahyurick Nov 3, 2022
7d35341
Update tests/integration/test_rex.py
sarahyurick Nov 14, 2022
674437b
use np instead of datetime for scalars
sarahyurick Nov 14, 2022
4b1bbd8
wrap str case in np.datetime64
sarahyurick Nov 14, 2022
a2c5d82
Merge branch 'main' into to_timestamp
ayushdg Nov 15, 2022
07e5c18
Merge branch 'main' into to_timestamp
sarahyurick Nov 15, 2022
be05bac
Merge branch 'main' into to_timestamp
ayushdg Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 76 additions & 1 deletion dask_planner/src/dialect.rs
@@ -1,6 +1,11 @@
use core::{iter::Peekable, str::Chars};

use datafusion_sql::sqlparser::dialect::Dialect;
use datafusion_sql::sqlparser::{
ast::{Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value},
dialect::Dialect,
parser::{Parser, ParserError},
tokenizer::Token,
};

#[derive(Debug)]
pub struct DaskDialect {}
Expand Down Expand Up @@ -37,4 +42,74 @@ impl Dialect for DaskDialect {
fn supports_filter_during_aggregation(&self) -> bool {
true
}

/// override expression parsing
fn parse_prefix(&self, parser: &mut Parser) -> Option<Result<Expr, ParserError>> {
fn parse_expr(parser: &mut Parser) -> Result<Option<Expr>, ParserError> {
match parser.peek_token() {
Token::Word(w) if w.value.to_lowercase() == "timestampadd" => {
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
// TIMESTAMPADD(YEAR, 2, d)
parser.next_token(); // skip timestampadd
parser.expect_token(&Token::LParen)?;
let time_unit = parser.next_token();
parser.expect_token(&Token::Comma)?;
let n = parser.parse_expr()?;
parser.expect_token(&Token::Comma)?;
let expr = parser.parse_expr()?;
parser.expect_token(&Token::RParen)?;

// convert to function args
let args = vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
Value::SingleQuotedString(time_unit.to_string()),
))),
FunctionArg::Unnamed(FunctionArgExpr::Expr(n)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)),
];

Ok(Some(Expr::Function(Function {
name: ObjectName(vec![Ident::new("timestampadd")]),
args,
over: None,
distinct: false,
special: false,
})))
}
Token::Word(w) if w.value.to_lowercase() == "to_timestamp" => {
parser.next_token(); // skip to_timestamp
parser.expect_token(&Token::LParen)?;
let expr = parser.parse_expr()?;
let comma = parser.consume_token(&Token::Comma);
let time_format = if comma {
parser.next_token().to_string()
} else {
"%Y-%m-%d %H:%M:%S".to_string()
};
parser.expect_token(&Token::RParen)?;

// convert to function args
let args = vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
Value::SingleQuotedString(time_format),
))),
];

Ok(Some(Expr::Function(Function {
name: ObjectName(vec![Ident::new("dsql_totimestamp")]),
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
args,
over: None,
distinct: false,
special: false,
})))
}
_ => Ok(None),
}
}
match parse_expr(parser) {
Ok(Some(expr)) => Some(Ok(expr)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
46 changes: 46 additions & 0 deletions dask_planner/src/parser.rs
Expand Up @@ -1236,6 +1236,52 @@ impl<'a> DaskParser<'a> {
mod test {
use crate::parser::{DaskParser, DaskStatement};

#[test]
fn timestampadd() {
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
let sql = "SELECT TIMESTAMPADD(YEAR, 2, d) FROM t";
let statements = DaskParser::parse_sql(sql).unwrap();
assert_eq!(1, statements.len());
let actual = format!("{:?}", statements[0]);
let expected = "projection: [\
UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"timestampadd\", quote_style: None }]), \
args: [\
Unnamed(Expr(Value(SingleQuotedString(\"YEAR\")))), \
Unnamed(Expr(Value(Number(\"2\", false)))), \
Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None })))\
], over: None, distinct: false, special: false }))\
]";
assert!(actual.contains(expected));
}

#[test]
fn to_timestamp() {
let sql1 = "SELECT TO_TIMESTAMP(d) FROM t";
let statements1 = DaskParser::parse_sql(sql1).unwrap();
assert_eq!(1, statements1.len());
let actual1 = format!("{:?}", statements1[0]);
let expected1 = "projection: [\
UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \
args: [\
Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \
Unnamed(Expr(Value(SingleQuotedString(\"%Y-%m-%d %H:%M:%S\"))))\
], over: None, distinct: false, special: false }))\
]";
assert!(actual1.contains(expected1));

let sql2 = "SELECT TO_TIMESTAMP(d, \"%d/%m/%Y\") FROM t";
let statements2 = DaskParser::parse_sql(sql2).unwrap();
assert_eq!(1, statements2.len());
let actual2 = format!("{:?}", statements2[0]);
let expected2 = "projection: [\
UnnamedExpr(Function(Function { name: ObjectName([Ident { value: \"dsql_totimestamp\", quote_style: None }]), \
args: [\
Unnamed(Expr(Identifier(Ident { value: \"d\", quote_style: None }))), \
Unnamed(Expr(Value(SingleQuotedString(\"\\\"%d/%m/%Y\\\"\"))))\
], over: None, distinct: false, special: false }))\
]";
assert!(actual2.contains(expected2));
}

#[test]
fn create_model() {
let sql = r#"CREATE MODEL my_model WITH (
Expand Down
18 changes: 18 additions & 0 deletions dask_planner/src/sql.rs
Expand Up @@ -152,6 +152,24 @@ impl ContextProvider for DaskSQLContext {
let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Int64)));
return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun)));
}
"dsql_totimestamp" => {
let sig = Signature::one_of(
vec![
TypeSignature::Exact(vec![DataType::Int8, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int16, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt8, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt16, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt32, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::UInt64, DataType::Utf8]),
TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
],
Volatility::Immutable,
);
let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Date64)));
return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun)));
}
"atan2" | "mod" => {
let sig = Signature::variadic(
vec![DataType::Float64, DataType::Float64],
Expand Down
33 changes: 33 additions & 0 deletions dask_sql/physical/rex/core/call.py
@@ -1,6 +1,7 @@
import logging
import operator
import re
from datetime import datetime
from functools import partial, reduce
from typing import TYPE_CHECKING, Any, Callable, Union

Expand Down Expand Up @@ -613,6 +614,37 @@ def extract(self, what, df: SeriesOrScalar):
raise NotImplementedError(f"Extraction of {what} is not (yet) implemented.")


class ToTimestampOperation(Operation):
def __init__(self):
super().__init__(self.to_timestamp)

def to_timestamp(self, df, format):
# Remove double and single quotes from string
format = format.replace('"', "")
format = format.replace("'", "")

# TODO: format timestamps for GPU tests
if "cudf" in str(type(df)):
if format != "%Y-%m-%d %H:%M:%S":
print("Formatting timestamps not supported on GPU")
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
if df.dtype == "object":
return df
else:
return df * 10**9
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
# String cases
elif type(df) == str:
return datetime.strptime(df, format)
ayushdg marked this conversation as resolved.
Show resolved Hide resolved
elif df.dtype == "object":
df = dd.to_datetime(df)
# Integer cases
elif np.isscalar(df):
df = datetime.utcfromtimestamp(df)
return df.strftime(format)
else:
df = dd.to_datetime(df, unit="s")
return df.dt.strftime(format)
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved


class YearOperation(Operation):
def __init__(self):
super().__init__(self.extract_year)
Expand Down Expand Up @@ -990,6 +1022,7 @@ class RexCallPlugin(BaseRexPlugin):
lambda x: x + pd.tseries.offsets.MonthEnd(1),
lambda x: convert_to_datetime(x) + pd.tseries.offsets.MonthEnd(1),
),
"dsql_totimestamp": ToTimestampOperation(),
# Temporary UDF functions that need to be moved after this POC
"datepart": DatePartOperation(),
"year": YearOperation(),
Expand Down
110 changes: 110 additions & 0 deletions tests/integration/test_rex.py
Expand Up @@ -677,3 +677,113 @@ def test_date_functions(c):
FROM df
"""
)


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
def test_totimestamp(c, gpu):
if gpu:
import cudf

sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
df = pd.DataFrame(
{
"a": np.array([1203073300, 1406073600, 2806073600]),
}
)
if gpu:
df = cudf.from_pandas(df)
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
c.create_table("df", df, gpu=gpu)

df = c.sql(
"""
SELECT to_timestamp(a) AS date FROM df
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
"""
)
expected_df = pd.DataFrame(
{
"date": [
datetime(2008, 2, 15, 11, 1, 40),
datetime(2014, 7, 23),
datetime(2058, 12, 2, 16, 53, 20),
],
}
)
assert_eq(df, expected_df, check_dtype=False)

df = c.sql(
"""
SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df
"""
)
expected_df = pd.DataFrame(
{
"date": [
datetime(2008, 2, 15),
datetime(2014, 7, 23),
datetime(2058, 2, 12),
],
}
)
if not gpu:
assert_eq(df, expected_df, check_dtype=False)
ayushdg marked this conversation as resolved.
Show resolved Hide resolved

df = pd.DataFrame(
{
"a": np.array(["1997-02-28 10:30:00", "1997-03-28 10:30:01"]),
}
)
if gpu:
df = cudf.from_pandas(df)
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
c.create_table("df", df, gpu=gpu)

df = c.sql(
"""
SELECT to_timestamp(a) AS date FROM df
"""
)
expected_df = pd.DataFrame(
{
"date": [
datetime(1997, 2, 28, 10, 30, 0),
datetime(1997, 3, 28, 10, 30, 1),
],
}
)
assert_eq(df, expected_df, check_dtype=False)

df = c.sql(
"""
SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df
"""
)
expected_df = pd.DataFrame(
{
"date": [
datetime(1997, 2, 28),
datetime(1997, 3, 28),
],
}
)
if not gpu:
assert_eq(df, expected_df, check_dtype=False)
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved

int_input = 1203073300
df = c.sql(f"SELECT to_timestamp({int_input}) as date")
expected_df = pd.DataFrame(
{
"date": [
datetime(2008, 2, 15, 11, 1, 40),
],
}
)
assert_eq(df, expected_df, check_dtype=False)

string_input = "1997-02-28 10:30:00"
df = c.sql(f"SELECT to_timestamp('{string_input}') as date")
expected_df = pd.DataFrame(
{
"date": [
datetime(1997, 2, 28, 10, 30, 0),
],
}
)
assert_eq(df, expected_df, check_dtype=False)