Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to_timestamp #838

Merged
merged 40 commits into from Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
b40ff39
functionality and pytest
sarahyurick Oct 5, 2022
93d13d3
style fix
sarahyurick Oct 5, 2022
827cfa9
Merge branch 'main' into to_timestamp
sarahyurick Oct 14, 2022
ab053b0
add format param
sarahyurick Oct 14, 2022
6bdf852
lint
sarahyurick Oct 14, 2022
ed42f94
remove quotes from result
sarahyurick Oct 14, 2022
190624a
return date64 instead of str
sarahyurick Oct 14, 2022
067722d
lint
sarahyurick Oct 14, 2022
a3c6482
Apply suggestions from code review
sarahyurick Oct 18, 2022
16ebc65
add string input and test
sarahyurick Oct 18, 2022
26c2d71
lint
sarahyurick Oct 18, 2022
f7cbf9f
timestampadd parser test
sarahyurick Oct 19, 2022
3ae33a8
change Variadic to Exact
sarahyurick Oct 19, 2022
bd25755
Merge branch 'main' into to_timestamp
sarahyurick Oct 19, 2022
9f80fb5
rust test and pdlike
sarahyurick Oct 24, 2022
228959b
fix rust test maybe?
sarahyurick Oct 24, 2022
beef8c2
minor change
sarahyurick Oct 24, 2022
8b54493
fix rust test
sarahyurick Oct 24, 2022
457bf0d
Merge branch 'main' into to_timestamp
sarahyurick Oct 26, 2022
89af2bf
gpu test?
sarahyurick Oct 27, 2022
78096c1
edit gpu test
sarahyurick Oct 27, 2022
5ca6f96
try again
sarahyurick Oct 28, 2022
e8cc5a1
dask_cudf
sarahyurick Oct 28, 2022
b48426e
try except to_cupy
sarahyurick Oct 28, 2022
6b4e0be
use dd and add scalar/string tests
sarahyurick Oct 28, 2022
3eca988
style fix
sarahyurick Oct 28, 2022
f917a34
pass most gpu tests?
sarahyurick Nov 1, 2022
df688e7
Update call.py
sarahyurick Nov 1, 2022
ae9f38f
Merge branch 'main' into to_timestamp
sarahyurick Nov 1, 2022
65a010d
Apply suggestions from code review
sarahyurick Nov 2, 2022
7078909
Merge branch 'main' into to_timestamp
sarahyurick Nov 2, 2022
1e98a46
add pytest.mark.skip and comments for gpu tests
sarahyurick Nov 2, 2022
b6c5549
update with Ayush's suggestions
sarahyurick Nov 3, 2022
afb33e6
link to issue
sarahyurick Nov 3, 2022
7d35341
Update tests/integration/test_rex.py
sarahyurick Nov 14, 2022
674437b
use np instead of datetime for scalars
sarahyurick Nov 14, 2022
4b1bbd8
wrap str case in np.datetime64
sarahyurick Nov 14, 2022
a2c5d82
Merge branch 'main' into to_timestamp
ayushdg Nov 15, 2022
07e5c18
Merge branch 'main' into to_timestamp
sarahyurick Nov 15, 2022
be05bac
Merge branch 'main' into to_timestamp
ayushdg Nov 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
99 changes: 97 additions & 2 deletions dask_planner/src/dialect.rs
@@ -1,6 +1,12 @@
use core::{iter::Peekable, str::Chars};

use core::iter::Peekable;
use core::str::Chars;
use datafusion_sql::sqlparser::ast::{
Expr, Function, FunctionArg, FunctionArgExpr, Ident, ObjectName, Value,
};
use datafusion_sql::sqlparser::dialect::Dialect;
use datafusion_sql::sqlparser::parser::{Parser, ParserError};
use datafusion_sql::sqlparser::tokenizer::Token;
use datafusion_sql::sqlparser::keywords::Keyword;

#[derive(Debug)]
pub struct DaskDialect {}
Expand Down Expand Up @@ -37,4 +43,93 @@ impl Dialect for DaskDialect {
fn supports_filter_during_aggregation(&self) -> bool {
true
}

/// override expression parsing
fn parse_prefix(&self, parser: &mut Parser) -> Option<Result<Expr, ParserError>> {
fn parse_expr(parser: &mut Parser) -> Result<Option<Expr>, ParserError> {
match parser.peek_token() {
Token::Word(w) if w.value.to_lowercase() == "timestampadd" => {
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
// TIMESTAMPADD(YEAR, 2, d)
parser.next_token(); // skip timestampadd
parser.expect_token(&Token::LParen)?;
let time_unit = parser.next_token();
parser.expect_token(&Token::Comma)?;
let n = parser.parse_expr()?;
parser.expect_token(&Token::Comma)?;
let expr = parser.parse_expr()?;
parser.expect_token(&Token::RParen)?;

// convert to function args
let args = vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
Value::SingleQuotedString(time_unit.to_string()),
))),
FunctionArg::Unnamed(FunctionArgExpr::Expr(n)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)),
];

Ok(Some(Expr::Function(Function {
name: ObjectName(vec![Ident::new("TIMESTAMPADD")]),
args,
over: None,
distinct: false,
special: false,
})))
}
Token::Word(w) if w.value.to_lowercase() == "to_timestamp" => {
parser.next_token(); // skip to_timestamp
parser.expect_token(&Token::LParen)?;
let expr = parser.parse_expr()?;
let comma = parser.consume_token(&Token::Comma);
if comma {
// Parse TO_TIMESTAMP(d, "%Y-%m-%d %H:%M:%S")
let time_format = parser.next_token();
parser.expect_token(&Token::RParen)?;

// convert to function args
let args = vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
Value::SingleQuotedString(time_format.to_string()),
))),
];

Ok(Some(Expr::Function(Function {
name: ObjectName(vec![Ident::new("dsql_totimestamp")]),
args,
over: None,
distinct: false,
special: false,
})))
} else {
// Parse TO_TIMESTAMP(d)
let time_format = "%Y-%m-%d %H:%M:%S";
parser.expect_token(&Token::RParen)?;

// convert to function args
let args = vec![
FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)),
FunctionArg::Unnamed(FunctionArgExpr::Expr(Expr::Value(
Value::SingleQuotedString(time_format.to_string()),
))),
];

Ok(Some(Expr::Function(Function {
name: ObjectName(vec![Ident::new("dsql_totimestamp")]),
args,
over: None,
distinct: false,
special: false,
})))
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
}
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
}
_ => Ok(None)
}
}
match parse_expr(parser) {
Ok(Some(expr)) => Some(Ok(expr)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
8 changes: 8 additions & 0 deletions dask_planner/src/sql.rs
Expand Up @@ -152,6 +152,14 @@ impl ContextProvider for DaskSQLContext {
let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Int64)));
return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun)));
}
"dsql_totimestamp" => {
let sig = Signature::variadic(
vec![DataType::Date64, DataType::Utf8],
Volatility::Immutable,
);
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
let rtf: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Utf8)));
return Some(Arc::new(ScalarUDF::new(name, &sig, &rtf, &fun)));
}
"atan2" | "mod" => {
let sig = Signature::variadic(
vec![DataType::Float64, DataType::Float64],
Expand Down
13 changes: 13 additions & 0 deletions dask_sql/physical/rex/core/call.py
Expand Up @@ -600,6 +600,18 @@ def extract(self, what, df: SeriesOrScalar):
raise NotImplementedError(f"Extraction of {what} is not (yet) implemented.")


class ToTimestampOperation(Operation):
def __init__(self):
super().__init__(self.to_timestamp)

def to_timestamp(self, df, format):
df = pd.to_datetime(df, unit="s")
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
df = df.strftime(format)
result = [timestamp for timestamp in df]
result = pd.Series(result)
return result


class YearOperation(Operation):
def __init__(self):
super().__init__(self.extract_year)
Expand Down Expand Up @@ -976,6 +988,7 @@ class RexCallPlugin(BaseRexPlugin):
lambda x: x + pd.tseries.offsets.MonthEnd(1),
lambda x: convert_to_datetime(x) + pd.tseries.offsets.MonthEnd(1),
),
"dsql_totimestamp": ToTimestampOperation(),
# Temporary UDF functions that need to be moved after this POC
"datepart": DatePartOperation(),
"year": YearOperation(),
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/test_rex.py
Expand Up @@ -656,3 +656,48 @@ def test_date_functions(c):
FROM df
"""
)


def test_totimestamp(c):
sarahyurick marked this conversation as resolved.
Show resolved Hide resolved
df = pd.DataFrame(
{
"a": [1203073300, 1406073600, 2806073600],
}
)
c.create_table("df", df)

df = c.sql(
"""
SELECT to_timestamp(a) AS date FROM df
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
"""
)

expected_df = pd.DataFrame(
{
"date": [
datetime(2008, 2, 15, 11, 1, 40),
datetime(2014, 7, 23),
datetime(2058, 12, 2, 16, 53, 20),
],
}
)

assert_eq(df, expected_df, check_dtype=False)

df = c.sql(
"""
SELECT to_timestamp(a, "%d/%m/%Y") AS date FROM df
"""
)

expected_df = pd.DataFrame(
{
"date": [
"15/02/2008",
"23/07/2014",
"02/12/2058",
],
}
)

assert_eq(df, expected_df, check_dtype=False)