Skip to content

Commit

Permalink
[SPARK-40821][SQL][CORE][PYTHON][SS] Introduce window_time function t…
Browse files Browse the repository at this point in the history
…o extract event time from the window column

### What changes were proposed in this pull request?

This PR introduces a window_time function to extract streaming event time from a window column produced by the window aggregating operators. This is one step in sequence of fixes required to add support for multiple stateful operators in Spark Structured Streaming as described in https://issues.apache.org/jira/browse/SPARK-40821

### Why are the changes needed?

The window_time function is a convenience function to compute correct event time for a window aggregate records. Such records produced by window aggregating operators have no explicit event time but rather a window column of type StructType { start: TimestampType, end: TimestampType } where start is inclusive and end is exclusive. The correct event time for such record is window.end - 1. The event time is necessary when chaining other stateful operators after the window aggregating operators.

### Does this PR introduce _any_ user-facing change?

Yes: The PR introduces a new window_time SQL function for both Scala and Python APIs.

### How was this patch tested?

Added new unit tests.

Closes #38288 from alex-balikov/SPARK-40821-time-window.

Authored-by: Alex Balikov <91913242+alex-balikov@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
alex-balikov authored and HeartSaVioR committed Oct 23, 2022
1 parent e73b157 commit 96b5d50
Show file tree
Hide file tree
Showing 11 changed files with 555 additions and 237 deletions.
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Expand Up @@ -142,6 +142,7 @@ Datetime Functions
window
session_window
timestamp_seconds
window_time


Collection Functions
Expand Down
46 changes: 46 additions & 0 deletions python/pyspark/sql/functions.py
Expand Up @@ -4884,6 +4884,52 @@ def check_string_field(field, fieldName): # type: ignore[no-untyped-def]
return _invoke_function("window", time_col, windowDuration)


def window_time(
windowColumn: "ColumnOrName",
) -> Column:
"""Computes the event time from a window column. The column window values are produced
by window aggregating operators and are of type `STRUCT<start: TIMESTAMP, end: TIMESTAMP>`
where start is inclusive and end is exclusive. The event time of records produced by window
aggregating operators can be computed as ``window_time(window)`` and are
``window.end - lit(1).alias("microsecond")`` (as microsecond is the minimal supported event
time precision). The window column must be one produced by a window aggregating operator.
.. versionadded:: 3.4.0
Parameters
----------
windowColumn : :class:`~pyspark.sql.Column`
The window column of a window aggregate records.
Returns
-------
:class:`~pyspark.sql.Column`
the column for computed results.
Examples
--------
>>> import datetime
>>> df = spark.createDataFrame(
... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)],
... ).toDF("date", "val")
Group the data into 5 second time windows and aggregate as sum.
>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
Extract the window event time using the window_time function.
>>> w.select(
... w.window.end.cast("string").alias("end"),
... window_time(w.window).cast("string").alias("window_time"),
... "sum"
... ).collect()
[Row(end='2016-03-11 09:00:10', window_time='2016-03-11 09:00:09.999999', sum=1)]
"""
window_col = _to_java_column(windowColumn)
return _invoke_function("window_time", window_col)


def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) -> Column:
"""
Generates session window given a timestamp specifying column.
Expand Down
16 changes: 16 additions & 0 deletions python/pyspark/sql/tests/test_functions.py
Expand Up @@ -894,6 +894,22 @@ def test_window_functions_cumulative_sum(self):
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[: len(r)])

def test_window_time(self):
df = self.spark.createDataFrame(
[(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], ["date", "val"]
)
from pyspark.sql import functions as F

w = df.groupBy(F.window("date", "5 seconds")).agg(F.sum("val").alias("sum"))
r = w.select(
w.window.end.cast("string").alias("end"),
F.window_time(w.window).cast("string").alias("window_time"),
"sum",
).collect()
self.assertEqual(
r[0], Row(end="2016-03-11 09:00:10", window_time="2016-03-11 09:00:09.999999", sum=1)
)

def test_collect_functions(self):
df = self.spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
from pyspark.sql import functions
Expand Down
Expand Up @@ -56,7 +56,6 @@ import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.DAY
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{Utils => CUtils}

Expand Down Expand Up @@ -313,6 +312,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveAggregateFunctions ::
TimeWindowing ::
SessionWindowing ::
ResolveWindowTime ::
ResolveDefaultColumns(v1SessionCatalog) ::
ResolveInlineTables ::
ResolveLambdaVariables ::
Expand Down Expand Up @@ -3965,242 +3965,6 @@ object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
}
}

/**
* Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to
* figure out how many windows a time column can map to, we over-estimate the number of windows and
* filter out the rows where the time column is not inside the time window.
*/
object TimeWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val WINDOW_COL_NAME = "window"
private final val WINDOW_START = "start"
private final val WINDOW_END = "end"

/**
* Generates the logical plan for generating window ranges on a timestamp column. Without
* knowing what the timestamp value is, it's non-trivial to figure out deterministically how many
* window ranges a timestamp will map to given all possible combinations of a window duration,
* slide duration and start time (offset). Therefore, we express and over-estimate the number of
* windows there may be, and filter the valid windows. We use last Project operator to group
* the window columns into a struct so they can be accessed as `window.start` and `window.end`.
*
* The windows are calculated as below:
* maxNumOverlapping <- ceil(windowDuration / slideDuration)
* for (i <- 0 until maxNumOverlapping)
* lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration
* windowStart <- lastStart - i * slideDuration
* windowEnd <- windowStart + windowDuration
* return windowStart, windowEnd
*
* This behaves as follows for the given parameters for the time: 12:05. The valid windows are
* marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the
* Filter operator.
* window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
* 11:55 - 12:07 + 11:52 - 12:04 x
* 12:00 - 12:12 + 11:57 - 12:09 +
* 12:05 - 12:17 + 12:02 - 12:14 +
*
* @param plan The logical plan
* @return the logical plan that will generate the time windows using the Expand operator, with
* the Filter operator for correctness and Project for usability.
*/
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(TIME_WINDOW), ruleId) {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val windowExpressions =
p.expressions.flatMap(_.collect { case t: TimeWindow => t }).toSet

val numWindowExpr = p.expressions.flatMap(_.collect {
case s: SessionWindow => s
case t: TimeWindow => t
}).toSet.size

// Only support a single window expression for now
if (numWindowExpr == 1 && windowExpressions.nonEmpty &&
windowExpressions.head.timeColumn.resolved &&
windowExpressions.head.checkInputDataTypes().isSuccess) {

val window = windowExpressions.head

val metadata = window.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

def getWindow(i: Int, dataType: DataType): Expression = {
val timestamp = PreciseTimestampConversion(window.timeColumn, dataType, LongType)
val lastStart = timestamp - (timestamp - window.startTime
+ window.slideDuration) % window.slideDuration
val windowStart = lastStart - i * window.slideDuration
val windowEnd = windowStart + window.windowDuration

// We make sure value fields are nullable since the dataType of TimeWindow defines them
// as nullable.
CreateNamedStruct(
Literal(WINDOW_START) ::
PreciseTimestampConversion(windowStart, LongType, dataType).castNullable() ::
Literal(WINDOW_END) ::
PreciseTimestampConversion(windowEnd, LongType, dataType).castNullable() ::
Nil)
}

val windowAttr = AttributeReference(
WINDOW_COL_NAME, window.dataType, metadata = metadata)()

if (window.windowDuration == window.slideDuration) {
val windowStruct = Alias(getWindow(0, window.timeColumn.dataType), WINDOW_COL_NAME)(
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))

val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

// For backwards compatibility we add a filter to filter out nulls
val filterExpr = IsNotNull(window.timeColumn)

replacedPlan.withNewChildren(
Project(windowStruct +: child.output,
Filter(filterExpr, child)) :: Nil)
} else {
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
val windows =
Seq.tabulate(overlappingWindows)(i =>
getWindow(i, window.timeColumn.dataType))

val projections = windows.map(_ +: child.output)

// When the condition windowDuration % slideDuration = 0 is fulfilled,
// the estimation of the number of windows becomes exact one,
// which means all produced windows are valid.
val filterExpr =
if (window.windowDuration % window.slideDuration == 0) {
IsNotNull(window.timeColumn)
} else {
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
}

val substitutedPlan = Filter(filterExpr,
Expand(projections, windowAttr +: child.output, child))

val renamedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}

renamedPlan.withNewChildren(substitutedPlan :: Nil)
}
} else if (numWindowExpr > 1) {
throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
} else {
p // Return unchanged. Analyzer will throw exception later
}
}
}

/** Maps a time column to a session window. */
object SessionWindowing extends Rule[LogicalPlan] {
import org.apache.spark.sql.catalyst.dsl.expressions._

private final val SESSION_COL_NAME = "session_window"
private final val SESSION_START = "start"
private final val SESSION_END = "end"

/**
* Generates the logical plan for generating session window on a timestamp column.
* Each session window is initially defined as [timestamp, timestamp + gap).
*
* This also adds a marker to the session column so that downstream can easily find the column
* on session window.
*/
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case p: LogicalPlan if p.children.size == 1 =>
val child = p.children.head
val sessionExpressions =
p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet

val numWindowExpr = p.expressions.flatMap(_.collect {
case s: SessionWindow => s
case t: TimeWindow => t
}).toSet.size

// Only support a single session expression for now
if (numWindowExpr == 1 && sessionExpressions.nonEmpty &&
sessionExpressions.head.timeColumn.resolved &&
sessionExpressions.head.checkInputDataTypes().isSuccess) {

val session = sessionExpressions.head

val metadata = session.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.putBoolean(SessionWindow.marker, true)
.build()

val sessionAttr = AttributeReference(
SESSION_COL_NAME, session.dataType, metadata = newMetadata)()

val sessionStart =
PreciseTimestampConversion(session.timeColumn, session.timeColumn.dataType, LongType)
val gapDuration = session.gapDuration match {
case expr if Cast.canCast(expr.dataType, CalendarIntervalType) =>
Cast(expr, CalendarIntervalType)
case other =>
throw QueryCompilationErrors.sessionWindowGapDurationDataTypeError(other.dataType)
}
val sessionEnd = PreciseTimestampConversion(session.timeColumn + gapDuration,
session.timeColumn.dataType, LongType)

// We make sure value fields are nullable since the dataType of SessionWindow defines them
// as nullable.
val literalSessionStruct = CreateNamedStruct(
Literal(SESSION_START) ::
PreciseTimestampConversion(sessionStart, LongType, session.timeColumn.dataType)
.castNullable() ::
Literal(SESSION_END) ::
PreciseTimestampConversion(sessionEnd, LongType, session.timeColumn.dataType)
.castNullable() ::
Nil)

val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))

val replacedPlan = p transformExpressions {
case s: SessionWindow => sessionAttr
}

val filterByTimeRange = session.gapDuration match {
case Literal(interval: CalendarInterval, CalendarIntervalType) =>
interval == null || interval.months + interval.days + interval.microseconds <= 0
case _ => true
}

// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap duration.
val filterExpr = if (filterByTimeRange) {
IsNotNull(session.timeColumn) &&
(sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
} else {
IsNotNull(session.timeColumn)
}

replacedPlan.withNewChildren(
Filter(filterExpr,
Project(sessionStruct +: child.output, child)) :: Nil)
} else if (numWindowExpr > 1) {
throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)
} else {
p // Return unchanged. Analyzer will throw exception later
}
}
}

/**
* Resolve expressions if they contains [[NamePlaceholder]]s.
*/
Expand Down
Expand Up @@ -639,6 +639,7 @@ object FunctionRegistry {
expression[Year]("year"),
expression[TimeWindow]("window"),
expression[SessionWindow]("session_window"),
expression[WindowTime]("window_time"),
expression[MakeDate]("make_date"),
expression[MakeTimestamp]("make_timestamp"),
// We keep the 2 expression builders below to have different function docs.
Expand Down

0 comments on commit 96b5d50

Please sign in to comment.